【问题标题】:Efficient Numpy 2D array construction from 1D array从一维数组构建高效的 Numpy 二维数组
【发布时间】:2011-06-22 20:34:15
【问题描述】:

我有一个这样的数组:

A = array([1,2,3,4,5,6,7,8,9,10])

我正在尝试获取这样的数组:

B = array([[1,2,3],
          [2,3,4],
          [3,4,5],
          [4,5,6]])

每行(具有固定的任意宽度)移动一个。 A 的数组长 10k 条记录,我试图在 Numpy 中找到一种有效的方法。目前我正在使用 vstack 和一个很慢的 for 循环。有更快的方法吗?

编辑:

width = 3 # fixed arbitrary width
length = 10000 # length of A which I wish to use
B = A[0:length + 1]
for i in range (1, length):
    B = np.vstack((B, A[i, i + width + 1]))

【问题讨论】:

  • 你能发布你的 vstack/loop 解决方案吗?
  • @wxbx:请详细说明您的目标是什么?请注意B = array([1,2,3],[2,3,4],[3,4,5],[4,5,6]) 无效numpy
  • @wxbx - 你的解决方案真的很不走运。你vstack数组10000次!看看我的回答,我vstack 一次。
  • oops 修复了语法错误...在 matlab 模式下思考。
  • A 真的等于一个递增的数字序列还是只是为了说明位置?如果是前者,我知道我可以快速做到这一点。 :)

标签: python numpy


【解决方案1】:

实际上,有一种更有效的方法可以做到这一点...使用vstack 等的缺点是您正在制作数组的副本。

顺便说一句,这实际上与@Paul 的答案相同,但我发布这个只是为了更详细地解释事情......

有一种方法可以只用视图来做到这一点,这样没有内存被复制。

我是直接从Erik Rigtorp's post to numpy-discussion 那里借来的,而后者又是从 Keith Goodman 的Bottleneck 那里借来的(这很有用!)。

基本技巧是直接操作strides of the array(对于一维数组):

import numpy as np

def rolling(a, window):
    shape = (a.size - window + 1, window)
    strides = (a.itemsize, a.itemsize)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

a = np.arange(10)
print rolling(a, 3)

a 是您的输入数组,window 是您想要的窗口长度(在您的情况下为 3)。

这会产生:

[[0 1 2]
 [1 2 3]
 [2 3 4]
 [3 4 5]
 [4 5 6]
 [5 6 7]
 [6 7 8]
 [7 8 9]]

但是,原始a 和返回的数组之间绝对没有内存重复。这意味着它比其他选项速度快并且可扩展性很多

例如(使用a = np.arange(100000)window=3):

%timeit np.vstack([a[i:i-window] for i in xrange(window)]).T
1000 loops, best of 3: 256 us per loop

%timeit rolling(a, window)
100000 loops, best of 3: 12 us per loop

如果我们将其推广到沿 N 维数组的最后一个轴的“滚动窗口”,我们会得到 Erik Rigtorp 的“滚动窗口”函数:

import numpy as np

def rolling_window(a, window):
   """
   Make an ndarray with a rolling window of the last dimension

   Parameters
   ----------
   a : array_like
       Array to add rolling window to
   window : int
       Size of rolling window

   Returns
   -------
   Array that is a view of the original array with a added dimension
   of size w.

   Examples
   --------
   >>> x=np.arange(10).reshape((2,5))
   >>> rolling_window(x, 3)
   array([[[0, 1, 2], [1, 2, 3], [2, 3, 4]],
          [[5, 6, 7], [6, 7, 8], [7, 8, 9]]])

   Calculate rolling mean of last dimension:
   >>> np.mean(rolling_window(x, 3), -1)
   array([[ 1.,  2.,  3.],
          [ 6.,  7.,  8.]])

   """
   if window < 1:
       raise ValueError, "`window` must be at least 1."
   if window > a.shape[-1]:
       raise ValueError, "`window` is too long."
   shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
   strides = a.strides + (a.strides[-1],)
   return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

那么,让我们看看这里发生了什么... 操作数组的strides 可能看起来有点神奇,但是一旦你了解了发生了什么,它就完全没有了。 numpy 数组的步幅描述了沿给定轴递增一个值所必须采取的步骤的大小(以字节为单位)。所以,在 64 位浮点数的一维数组的情况下,每一项的长度为 8 个字节,x.strides(8,)

x = np.arange(9)
print x.strides

现在,如果我们将其重塑为 2D、3x3 数组,则步长将为 (3 * 8, 8),因为我们必须跳转 24 个字节以沿第一个轴递增一步,并跳转 8 个字节以沿第一个轴递增一步第二轴。

y = x.reshape(3,3)
print y.strides

类似地,转置与只是反转数组的步幅相同:

print y
y.strides = y.strides[::-1]
print y

显然,数组的步长和数组的形状密切相关。如果我们改变一个,我们就必须相应地改变另一个,否则我们将无法对实际保存数组值的内存缓冲区进行有效描述。

因此,如果你想同时改变数组的形状和大小,你不能只通过设置x.stridesx.shape来做到这一点,即使新的strides和形状兼容。

这就是numpy.lib.as_strided 的用武之地。它实际上是一个非常简单的函数,它只是同时设置数组的步长和形状。

它会检查两者是否兼容,但不会检查旧步幅和新形状是否兼容,如果您独立设置两者,则会发生这种情况。 (它实际上是通过numpy's __array_interface__ 做到这一点的,它允许任意类将内存缓冲区描述为一个numpy 数组。)

所以,我们所做的只是让一个项目沿一个轴前进(在 64 位数组的情况下为 8 个字节),但也仅沿另一个轴前进 8 个字节

换句话说,在“窗口”大小为 3 的情况下,数组的形状为 (whatever, 3),但不是为第二维步进完整的 3 * x.itemsize,它只步进一个项目向前,有效地使新数组的行成为原始数组的“移动窗口”视图。

(这也意味着x.shape[0] * x.shape[1] 与新数组的x.size 不同。)

无论如何,希望这能让事情变得更清楚..

【讨论】:

  • Kinggton:我真的很佩服你的回答,但你不觉得这对 OP 的问题有点矫枉过正吗? ;-)。谢谢
  • @eat - 是的! :) 对于一个短数组来说,这绝对是矫枉过正(而且 OP 的 10K 元素数组相当短),但了解它仍然很有用。老实说,我只是觉得有时我喜欢写过长的答案......
  • Kingston:感谢您提供非常详细的回答,我在那里学到了很多东西。我还将您的代码与@eumiro 的答案进行了对比,您的滚动答案给了我 60 倍的加速!考虑到我打算在更大的阵列上使用它,加速非常有用。 :)
  • 感谢您的精彩回答!我不知道在 numpy 的引擎盖下会有这样的事情。绝对值得一试 - __array_interface__ 也很酷!
  • 这是救命稻草。谢谢。
【解决方案2】:

这个解决方案不能通过 python 循环有效地实现,因为它带有各种类型检查,在使用 numpy 数组时最好避免。如果您的阵列特别高,您会注意到这样的速度大大提高:

newshape = (4,3)
newstrides = (A.itemsize, A.itemsize)
B = numpy.lib.stride_tricks.as_strided(A, shape=newshape, strides=newstrides)

这给出了数组 A 的视图。如果您想要一个可以编辑的新数组,请执行相同操作,但在末尾添加 .copy()

步幅详情:

在这种情况下,newstrides 元组将是 (4,4),因为数组有 4 字节项,并且您希望继续在 i 维中以单项步骤逐步遍历数据。第二个值“4”指的是 j 维中的步幅(在正常的 4x4 数组中为 16)。因为在这种情况下,您还希望在 j 维度中以 4 字节的步长从缓冲区中增加读取。

Joe 给出了一个很好、详细的描述,当他说这个技巧所做的就是同时改变步幅和形状时,他把事情搞得一清二楚。

【讨论】:

  • +1 你打败了我!我正在输入这个......我仍然会发布我的答案,因为它会更详细一些。此外,您的 strides=(4,4) 假定 A.itemsize 为 4(即 32 位浮点数或整数)。最好是strides=(A.itemsize, A.itemsize)
  • 你能指出我的文档吗?我以前从未见过这个功能...
  • 谢谢乔。我正在寻找一些可以链接到的在线文档,但那里没有多少!这是我能找到的最好的:mentat.za.net/numpy/numpy_advanced_slides
  • @Benjamin - 它只是同时设置数组的步幅和形状。它适用于您需要同时更改两者的情况,但新的步幅将与旧的形状不兼容,反之亦然,因此您不能只做x.strides = new_stridesx.shape = new_shape
【解决方案3】:

您使用哪种方法?

import numpy as np
A = np.array([1,2,3,4,5,6,7,8,9,10])
width = 3

np.vstack([A[i:i-len(A)+width] for i in xrange(len(A)-width)])
# needs 26.3µs

np.vstack([A[i:i-width] for i in xrange(width)]).T
# needs 13.2µs

如果您的宽度相对较低 (3) 并且您的 A 很大(10000 个元素),那么差异就更重要了:第一个为 32.4 毫秒,第二个为 44 微秒。

【讨论】:

  • 谢谢!这正是我所需要的!是的,今天刚刚破解了 numpy,所以学习缓慢。
【解决方案4】:

只是为了进一步了解@Joe general的答案

import numpy as np
def rolling(a, window):
    step = 2 
    shape = ( (a.size-window)/step + 1   , window)


    strides = (a.itemsize*step, a.itemsize)

    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

a = np.arange(10)

print rolling(a, 3)

哪个输出:

[[0 1 2]
 [2 3 4]
 [4 5 6]
 [6 7 8]]

为了进一步概括 2d 的情况,即将它用于从图像中提取补丁

def rolling2d(a,win_h,win_w,step_h,step_w):

    h,w = a.shape
    shape = ( ((h-win_h)/step_h + 1)  * ((w-win_w)/step_w + 1) , win_h , win_w)

    strides = (step_w*a.itemsize, h*a.itemsize,a.itemsize)


    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

a = np.arange(36).reshape(6,6)
print a
print rolling2d (a,3,3,2,2)

哪个输出:

[[ 0  1  2  3  4  5]
 [ 6  7  8  9 10 11]
 [12 13 14 15 16 17]
 [18 19 20 21 22 23]
 [24 25 26 27 28 29]
 [30 31 32 33 34 35]]
[[[ 0  1  2]
  [ 6  7  8]
  [12 13 14]]

 [[ 2  3  4]
  [ 8  9 10]
  [14 15 16]]

 [[ 4  5  6]
  [10 11 12]
  [16 17 18]]

 [[ 6  7  8]
  [12 13 14]
  [18 19 20]]]

【讨论】:

  • 在上面的示例中是否可以不提取环绕原始数组右边缘的结果。例如,第三个输出 [4,5,6; 10,11,12; 16,17,18] 'wraps' 回来了。对于图像处理,我想避免这种情况,直接跳到下一个返回的结果。
【解决方案5】:

我认为这可能比循环更快,当宽度固定在一个较低的数字时...

import numpy
a = numpy.array([1,2,3,4,5,6])
b = numpy.reshape(a, (numpy.shape(a)[0],1))
b = numpy.concatenate((b, numpy.roll(b,-1,0), numpy.roll(b,-2,0)), 1)
b = b[0:(numpy.shape(a)[0]/2) + 1,:]

编辑显然,使用 strides 的解决方案优于此,唯一的主要缺点是它们还没有很好的记录......

【讨论】:

    【解决方案6】:

    我正在使用类似于@JustInTime 的更通用的函数,但适用于ndarray

    def sliding_window(x, size, overlap=0):
        step = size - overlap # in npts
        nwin = (x.shape[-1]-size)//step + 1
        shape = x.shape[:-1] + (nwin, size)
        strides = x.strides[:-1] + (step*x.strides[-1], x.strides[-1])
        return stride_tricks.as_strided(x, shape=shape, strides=strides)
    

    一个例子,

    x = np.arange(10)
    M.sliding_window(x, 5, 3)
    Out[1]: 
    array([[0, 1, 2, 3, 4],
           [2, 3, 4, 5, 6],
           [4, 5, 6, 7, 8]])
    
    
    x = np.arange(10).reshape((2,5))
    M.sliding_window(x, 3, 1)
    Out[2]: 
    array([[[0, 1, 2],
            [2, 3, 4]],
    
           [[5, 6, 7],
            [7, 8, 9]]])
    

    【讨论】:

    • 感谢分享。请注意,如果窗口不完全匹配,这会截断最后一行(在您的第一个示例中,它会丢失应包含“9”的行)。但是如果将“nwin”行更改为nwin = int(np.ceil((x.shape[-1]-size)/step + 1)),它似乎会用零填充结果。 (有点惊讶这并没有给出一些 seg 错误,但我猜它是内置在 stride_tricks 中的。)
    【解决方案7】:

    看看:view_as_windows

    import numpy as np
    from skimage.util.shape import view_as_windows
    window_shape = (4, )
    aa = np.arange(1000000000) # 1 billion
    bb = view_as_windows(aa, window_shape)
    

    大约 1 秒。

    【讨论】:

      猜你喜欢
      • 2015-07-13
      • 2023-01-04
      • 1970-01-01
      • 1970-01-01
      • 2016-12-27
      • 2012-11-06
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多