【问题标题】:Vectorize Sliding Window Dot Product矢量化滑动窗口点积
【发布时间】:2017-03-21 17:59:37
【问题描述】:

我有两个大向量(长度相等),我正在为它们计算滑动窗口点积:

import numpy as np

a = np.array([1, 2, 3, 4, 5, 6])
b = np.array([11, 22, 33, 44, 55, 66])

out = np.array(
    [[a[0]*b[0]+a[1]*b[1]+a[2]*b[2]],
     [a[1]*b[1]+a[2]*b[2]+a[3]*b[3]],
     [a[2]*b[2]+a[3]*b[3]+a[4]*b[4]],
     [a[3]*b[3]+a[4]*b[4]+a[5]*b[5]],
    ])

[[154]
 [319]
 [550]
 [847]]

当然,我可以调用点积函数,但是如果窗口/向量长度很大,那么它的效率不如下面的代码:

window = 3
result = np.empty([4,1])
result[0] = a[0]*b[0]+a[1]*b[1]+a[2]*b[2]
for i in range(3):
    result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]

[[154]
 [319]
 [550]
 [847]]

在这里,我们利用i+1th 点积类似于ith 点积这一事实。也就是说,

result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]

如何将我的 for 循环转换为矢量化函数,以便计算可以利用来自ith 步骤的信息,从而减少计算冗余,同时最大限度地减少所需的内存量。

更新

我确实需要:

import numpy as np

a = np.array([1, 2, 3, 4, 5, 6])
b = np.array([11, 22, 33, 44, 55, 66, 77, 88])

out = np.array(
    [a[0]*b[0]+a[1]*b[1]+a[2]*b[2]+a[3]*b[3]]+a[4]*b[4]]+a[5]*b[5],
     a[0]*b[1]+a[1]*b[2]+a[2]*b[3]+a[3]*b[4]]+a[4]*b[5]]+a[5]*b[6],
     a[0]*b[2]+a[1]*b[3]+a[2]*b[4]+a[3]*b[5]]+a[4]*b[6]]+a[5]*b[7],
    ])

[1001
 1232
 1463]

所以a 将滑过b 并计算点积。

【问题讨论】:

    标签: python numpy vectorization numpy-ndarray dot-product


    【解决方案1】:

    您可以对 O(n) 复杂度使用部分总和:

    ps = np.r_[0, np.cumsum(a*b)]
    ps[3:]-ps[:-3]
    # array([154, 319, 550, 847])
    

    或者更接近原始for 循环并避免非常大的部分和的变体:

    k = 3
    d = a*b
    d[k:] -= d[:-k].copy()
    np.cumsum(d)[k-1:]
    # array([154, 319, 550, 847])
    

    更新以匹配更新后的Q

    现在这确实是一个卷积,所以@Divakar 的解决方案或多或少适用。只是,您将直接对 a[::-1]b 进行卷积。如果速度是一个问题,您可以尝试将np.convolve 替换为scipy.signal.fftconvolve,这取决于您的操作数的大小可能会明显更快。但是,对于非常小的操作数或长度相差很大的操作数,您甚至可能会损失一些速度,因此请务必尝试两种方法:

    np.convolve(b, a[::-1], 'valid')
    scipy.signal.fftconvolve(b, a[::-1], 'valid')
    

    【讨论】:

    • 是的。这可以被塑造成理论上的东西,可以是published :)
    • 谢谢!这看起来是正确的答案。我不熟悉 np.r_ 但我认为它比先实例化一个空数组x = np.empty(len(a)+1),用np.cumsum(a*b, out=x[1:]) 填充数组,然后通过x[0] = 0 将第一个元素设置为零要好。我已经阅读了np.r 上的文档,它显然产生了正确的结果,但您是否介意提供更多信息,因为我相信它对其他人来说也是新的。
    • @slaw 不,您的预分配方法在性能方面更好,我只是懒惰。还请务必查看我刚刚添加的变体,因为如果您的向量长度为​​ 10^8,那么控制部分和的大小可能至关重要。
    • @slaw 感谢您的客气话。我刚刚修复了第二个解决方案中的一个小错误。就地分配d[k:] -= d[:-k] 不安全。所以必须添加.copy()。如果您使用此代码,请确保包含它。
    • @slaw cumsum 本质上是一个离散的antiderivative。滑动和对应于定积分。我所做的只是计算定积分的离散版本,作为在积分区间端点处评估的反导数的差。
    【解决方案2】:

    方法#1

    使用np.convolve 进行两个输入之间的元素乘法,并使用全为1 的内核和size=3 -

    np.convolve(a*b,np.ones(3),'valid')
    

    方法 #2

    由于我们只是简单地对窗口中的元素求和,我们也可以使用uniform_filter,就像这样 -

    from scipy.ndimage.filters import uniform_filter1d as unif1d
    
    def uniform_filter(a,W):
        hW = (W-1)//2
        return W*unif1d(a.astype(float),size=W, mode='constant')[hW:-hW]
    
    out = uniform_filter(a*b,W=3)
    

    基准测试

    循环方法 -

    def loopy_approach(a,b):
        window = 3
        N = a.size-window+1
    
        result = np.empty([N,1])
        result[0] = a[0]*b[0]+a[1]*b[1]+a[2]*b[2]
        for i in range(N-1):
            result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]
        return result
    

    时间和验证 -

    In [147]: a = np.random.randint(0,100,(1000))
         ...: b = np.random.randint(0,100,(1000))
         ...: 
    
    In [148]: out0 = loopy_approach(a,b).ravel()
         ...: out1 = np.convolve(a*b,np.ones(3),'valid')
         ...: out2 = uniform_filter(a*b,W=3)
         ...: 
    
    In [149]: np.allclose(out0,out1)
    Out[149]: True
    
    In [150]: np.allclose(out0,out2)
    Out[150]: True
    
    In [151]: %timeit loopy_approach(a,b)
         ...: %timeit np.convolve(a*b,np.ones(3),'valid')
         ...: %timeit uniform_filter(a*b,W=3)
         ...: 
    100 loops, best of 3: 2.27 ms per loop
    100000 loops, best of 3: 7 µs per loop
    100000 loops, best of 3: 10.2 µs per loop
    

    【讨论】:

    • 是的,我知道我可以做到,但 FFT 实际上是 O(nlogn),而上面的解决方案实际上是 O(n)。因此请求将其矢量化以代替 for 循环。同样,向量和窗口大小都会很长,因此卷积方法会差一个数量级。
    • @slaw 我认为您在这里不必要地引入了计算复杂性。这些卷积在 C/fortran AFAIK 中实现,因此与 NumPy/Scipy 等中的任何矢量化方法一样好。
    • @Divakar 请问第二种方法中的y 是什么?另外,我认为由于 OP 期望结果是 2D,因此您的第一种方法应该是 np.convolve(a*b,np.ones(3),'valid')[:, np.newaxis]
    • @kmario23 在这里添加一个新轴是微不足道的,不要认为 OP 会介意。不过还是感谢您指出!
    • @slaw 添加运行时测试,如果它们可以说服您重新考虑复杂性方面的想法。
    【解决方案3】:

    使用strides 的另一种方法是:

    In [12]: from numpy.lib.stride_tricks import as_strided
    In [13]: def using_strides(a, b, w=3):
                  shape = a.shape[:-1] + (a.shape[-1] - w + 1, w)
                  strides = a.strides + (a.strides[-1],)
                  res = np.sum((as_strided(a, shape=shape, strides=strides) * \ 
                                as_strided(b, shape=shape, strides=strides)), axis=1)
                  return res[:, np.newaxis]
    
    
    In [14]: using_strides(a, b, 3)
    Out[14]: 
    array([[154],
           [319],
           [550],
           [847]])
    

    【讨论】:

    • 是的,使用 strides 很好,但是对于更长的向量(1 亿个元素)和更大的窗口,由于冗余计算量,这变得更加昂贵。非冗余版本的时间复杂度为O(n)
    猜你喜欢
    • 1970-01-01
    • 2017-01-07
    • 2021-08-04
    • 2018-12-25
    • 1970-01-01
    • 1970-01-01
    • 2021-02-25
    • 2021-07-02
    • 2017-04-07
    相关资源
    最近更新 更多