【问题标题】:Optimize NumPy sum of matrices iterated through every element优化遍历每个元素的 NumPy 矩阵总和
【发布时间】:2014-10-06 10:08:17
【问题描述】:

我正在使用 numpy 1.9python 2.7 和 opencv,处理大矩阵,我必须多次执行以下操作

def sumShifted(A):  # A: numpy array 1000*1000*10
    return A[:, 0:-1] + A[:, 1:]    

如果可能的话,我想优化这个操作;我尝试使用Cython,但没有得到任何显着改善,但我不排除这是因为我的实施不当。

有没有办法让它更快?

编辑sumShifted 在这样的 for 循环中被调用:

for i in xrange(0, 400):
    # ... Various operations on B
    A = sumShifted(B)
    # ... Other operations on B


#More detailed
for i in xrange(0, 400):
    A = sumShifted(a11)
    B = sumShifted(a12)
    C = sumShifted(b12)
    D = sumShifted(b22)

    v = -upQ12/upQ11

    W, X, Z = self.function1( input_matrix, v, A, C[:,:,4], D[:,:,4] )
    S, D, F = self.function2( input_matrix, v, A, C[:,:,5], D[:,:,5] )
    AA      = self.function3( input_matrix, v, A, C[:,:,6], D[:,:,6] )
    BB      = self.function4( input_matrix, v, A, C[:,:,7], D[:,:,7] )

EDIT2:按照您的建议,我创建了这两个可运行的基准测试(使用 Cython),将 4 个 sumShifted 方法合并为一个。

A, B, C, D= improvedSumShifted(E, F, G, H)
#E,F: 1000x1000 matrices
#G,H: 1000x1000x8 matrices

#first implementation
def improvedSumShifted(np.ndarray[dtype_t, ndim=2] a, np.ndarray[dtype_t, ndim=2] b, np.ndarray[dtype_t, ndim=3] c, np.ndarray[dtype_t, ndim=3] d):
  cdef unsigned int i,j,k;
  cdef unsigned int w = a.shape[0], h = a.shape[1]-1, z = c.shape[2]
  cdef np.ndarray[dtype_t, ndim=2] aa = np.empty((w, h))
  cdef np.ndarray[dtype_t, ndim=2] bb = np.empty((w, h))
  cdef np.ndarray[dtype_t, ndim=3] cc = np.empty((w, h, z))
  cdef np.ndarray[dtype_t, ndim=3] dd = np.empty((w, h, z))
  with cython.boundscheck(False), cython.wraparound(False), cython.overflowcheck(False), cython.nonecheck(False):
    for i in range(w):
      for j in range(h):
        aa[i,j] = a[i,j] + a[i,j+1]
        bb[i,j] = b[i,j] + b[i,j+1]
        for k in range(z):
          cc[i,j,k] = c[i,j,k] + c[i,j+1,k]
          dd[i,j,k] = d[i,j,k] + d[i,j+1,k]
return aa, bb, cc, dd

#second implementation
def improvedSumShifted(np.ndarray[dtype_t, ndim=2] a, np.ndarray[dtype_t, ndim=2] b, np.ndarray[dtype_t, ndim=3] c, np.ndarray[dtype_t, ndim=3] d):
  cdef unsigned int i,j,k;
  cdef unsigned int w = a.shape[0], h = a.shape[1]-1, z = c.shape[2]
  cdef np.ndarray[dtype_t, ndim=2] aa = np.copy(a[:, 0:h])
  cdef np.ndarray[dtype_t, ndim=2] bb = np.copy(b[:, 0:h])
  cdef np.ndarray[dtype_t, ndim=3] cc = np.copy(c[:, 0:h])
  cdef np.ndarray[dtype_t, ndim=3] dd = np.copy(d[:, 0:h])
  with cython.boundscheck(False), cython.wraparound(False), cython.overflowcheck(False), cython.nonecheck(False):
  for i in range(w):
    for j in range(h):
      aa[i,j] += a[i,j+1]
      bb[i,j] += b[i,j+1]
      for k in range(z):
        cc[i,j,k] += c[i,j+1,k]
        dd[i,j,k] += d[i,j+1,k]

return aa, bb, cc, dd

【问题讨论】:

  • 你能给我们看一些代码来解释sumShifted是如何被调用的吗?
  • @Rowandish [1000,1000,10] 矩阵并不大,但是,您能否也发布您的 .timeit() 测量结果,了解您的初始实施速度是多少,以便对任何事物进行基准测试好还是不好?
  • @unutbu 编辑了问题
  • 我认为没有办法显着改善A[:, 0:-1] + A[:, 1:]。改进for-loop 可能是可能的。您能否发布一个我们可以进行基准测试和讨论的最小工作示例?
  • @Rowandish:恐怕有误会。与其尝试优化sumShifted,不如尝试优化for-loop。如果您需要帮助,我们需要更详细地了解整个for-loop 中发生了什么。可能有办法改进它,也可能没有。但是除非我们能看到完整的代码,否则这是不可能的。如果您知道这不是瓶颈,您可以用虚拟代理函数替换 function1function4。但我们需要看到更多,因为就目前而言,您可以通过完全删除 for-loop 来提高性能。

标签: python performance numpy matrix cython


【解决方案1】:

这个函数不太可能进一步加速:它实际上只在 python 级别执行四个操作:

  1. (2x) 对输入执行切片。这些类型的切片非常快,因为它们只需要少数整数运算来计算新的步幅和大小。
  2. 为输出分配一个新数组。对于这样一个简单的功能,这是一个很大的负担。
  3. 评估两个切片上的np.add ufunc,这是在 numpy 中高度优化的操作。

确实,我的基准测试显示无论使用 numba 还是 cython 都没有改善。在我的机器上,如果输出数组是预先分配的,我每次调用都会得到大约 30 毫秒,如果考虑到内存分配,我会得到大约 50 毫秒。

纯 numpy 版本:

import numpy as np

def ss1(A):
    return np.add(A[:,:-1,:],A[:,1:,:])

def ss2(A,output):
    return np.add(A[:,:-1,:],A[:,1:,:],output)

cython 版本:

import numpy as np
cimport numpy as np
cimport cython

def ss3(np.float64_t[:,:,::1] A not None):
    cdef unsigned int i,j,k;
    cdef np.float64_t[:,:,::1] ret = np.empty((A.shape[0],A.shape[1]-1,A.shape[2]),'f8')
    with cython.boundscheck(False), cython.wraparound(False):
        for i in range(A.shape[0]):
            for j in range(A.shape[1]-1):
                for k in range(A.shape[2]):
                    ret[i,j,k] = A[i,j,k] + A[i,j+1,k]
    return ret

def ss4(np.float64_t[:,:,::1] A not None, np.float64_t[:,:,::1] ret not None):
    cdef unsigned int i,j,k;
    assert ret.shape[0]>=A.shape[0] and ret.shape[1]>=A.shape[1]-1 and ret.shape[2]>=A.shape[2]
    with cython.boundscheck(False), cython.wraparound(False):
        for i in range(A.shape[0]):
            for j in range(A.shape[1]-1):
                for k in range(A.shape[2]):
                    ret[i,j,k] = A[i,j,k] + A[i,j+1,k]
    return ret

numba 版本(当前 numba 0.14.0 无法在优化函数中分配新数组):

@numba.njit('f8[:,:,:](f8[:,:,:],f8[:,:,:])')
def ss5(A,output):
    for i in range(A.shape[0]):
        for j in range(A.shape[1]-1):
            for k in range(A.shape[2]):
                output[i,j,k] = A[i,j,k] + A[i,j+1,k]
    return output

以下是时间安排:

>>> A = np.random.randn((1000,1000,10))
>>> output = np.empty((A.shape[0],A.shape[1]-1,A.shape[2]))

>>> %timeit ss1(A)
10 loops, best of 3: 50.2 ms per loop

>>> %timeit ss2(A,output)
10 loops, best of 3: 30.8 ms per loop

>>> %timeit ss3(A)
10 loops, best of 3: 50.8 ms per loop

>>> %timeit ss4(A,output)
10 loops, best of 3: 30.9 ms per loop

>>> %timeit ss5(A,output)
10 loops, best of 3: 31 ms per loop

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-16
    • 1970-01-01
    • 2018-02-16
    • 2015-07-21
    • 2017-04-09
    相关资源
    最近更新 更多