【问题标题】:Compute the trace of a matrix across all diagonals计算矩阵在所有对角线上的迹
【发布时间】:2014-06-28 23:07:27
【问题描述】:

我需要计算矩阵在其所有对角线上的轨迹。也就是说,对于一个 nxm 矩阵,该操作应该产生 n+m-1 个“轨迹”。这是一个示例程序:

import numpy as np

A=np.arange(12).reshape(3,4)

def function_1(A):  
    output=np.zeros(A.shape[0]+A.shape[1]-1)
    for i in range(A.shape[0]+A.shape[1]-1):
        output[i]=np.trace(A,A.shape[1]-1-i)
    return output

A
array([[ 0,  1,  2,  3],
       [ 4,  5,  6,  7],
       [ 8,  9, 10, 11]])

function_1(A)
array([  3.,   9.,  18.,  15.,  13.,   8.])

我希望找到一种方法来替换程序中的循环,因为我需要在非常大的矩阵上多次执行此计算。一种看起来很有希望的途径是 使用 numpy.einsum,但我不知道该怎么做。或者,我已经研究过用 cython 中的循环完全重写问题:

%load_ext cythonmagic
%%cython
import numpy as np
cimport numpy as np
cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
def function_2(long [:,:] A):   
    cdef int n=A.shape[0]
    cdef int m=A.shape[1]
    cdef long [::1] output = np.empty(n+m-1,dtype=np.int64)
    cdef size_t l1
    cdef int i,j, k1
    cdef long out

    it_list1=range(m)
    it_list2=range(m,m+n-1)
    for l1 in range(len(it_list1)):
        k1=it_list1[l1]
        i=0
        j=m-1-k1
        out=0
        while (i<n)&(j<m):
            out+=A[i,j]
            i+=1
            j+=1    
        output[k1]=out  
    for l1 in range(len(it_list2)):
        k1=it_list2[l1]
        i=k1-m+1
        j=0
        out=0
        while (i<n)&(j<m):
            out+=A[i,j]
            i+=1
            j+=1
        output[k1]=out  
    return np.array(output) 

cython 程序优于通过 np.trace 循环的程序:

%timeit function_1(A)
10000 loops, best of 3: 62.7 µs per loop
%timeit function_2(A)
100000 loops, best of 3: 9.66 µs per loop

所以,基本上我想获得关于是否有更有效的方式来使用 numpy/scipy 例程的反馈,或者我是否可能已经实现了 使用 cython 的最快方式。

【问题讨论】:

  • 我想知道这将如何比较:np.fromiter(map(A.trace, range(A.shape[1]-1, -A.shape[0], -1)), dtype=np.int64)
  • 对于大型矩阵,Cython 版本可以在内存访问方面进行改进。 IE。循环遍历行而不是对角线。
  • 如果您希望跟踪而不是零填充,那么我认为在傅立叶空间中会有一个很好的方法。

标签: python numpy cython


【解决方案1】:

如果您想远离 Cython,构建一个对角索引数组并使用 np.bincount 可能会成功:

>>> import numpy as np
>>> a = np.arange(12).reshape(3, 4)
>>> a
array([[ 0,  1,  2,  3],
       [ 4,  5,  6,  7],
       [ 8,  9, 10, 11]])
>>> rows, cols = a.shape
>>> rows_arr = np.arange(rows)
>>> cols_arr = np.arange(cols)
>>> diag_idx = rows_arr[:, None] - (cols_arr - (cols - 1))
>>> diag_idx
array([[3, 2, 1, 0],
       [4, 3, 2, 1],
       [5, 4, 3, 2]])
>>> np.bincount(diag_idx.ravel(), weights=a.ravel())
array([  3.,   9.,  18.,  15.,  13.,   8.])

根据我的时间,对于您的示例输入,它比您原来的纯 Python 方法快 4 倍。所以我认为它不会比你的 Cython 代码更快,但你可能想要计时。

【讨论】:

  • 对于一个 60×10 的数组,这比原来的要快 21 倍。
  • 很好,而且不会让记忆头痛。
  • 对于较大的阵列,使用diag_idx = rows_arr[:,None] - (cols_arr - cols + 1) 可以显着提高速度。
  • 好点,编辑了答案。我猜,但如果数组足够非正方形,那么从最短向量中减去可能也会很明显......
【解决方案2】:

如果您的矩阵形状距离正方形足够远,即它是高还是宽,那么您可以有效地使用跨步技巧来做到这一点。在任何情况下你都可以使用步幅技巧,但如果矩阵接近正方形,它可能不会超级节省内存。

您需要做的是在相同的数据上创建一个新的数组视图,该数组视图的构造方式是,从一行到下一行的步骤也会导致列的增量。这是通过改变数组的步幅来实现的。

需要处理的问题在于数组的边界,这里需要补零。如果数组远非正方形,这无关紧要。如果是正方形,那么我们需要两倍大小的数组来填充。

如果您不需要边缘处的较小走线,则无需补零。

这里是(假设列多于行,但很容易适应):

import numpy as np
from numpy.lib.stride_tricks import as_strided

A = np.arange(30).reshape(3, 10)
A_embedded = np.hstack([np.zeros([3, 2]), A, np.zeros([3, 2])])
A = A_embedded[:, 2:-2]  # We are now sure that the memory around A is padded with 0, but actually we never really need A again

new_strides = (A.strides[0] + A.strides[1], A.strides[1])
B = as_strided(A_embedded, shape=A_embedded[:, :-2].shape, strides=new_strides)

traces = B.sum(0)

print A
print B
print traces

为了符合您在示例中显示的输出,您需要将其反转(请参阅@larsmans 评论)

traces = traces[::-1]

这是一个具体数字的具体例子。如果这对您的用例有用,我可以将其转换为通用功能。

【讨论】:

  • +1,虽然输出的顺序与原始顺序相反。 B.sum(axis=0)[::-1] 解决了这个问题。
  • 感谢您的意见,我没有注意到。由于我看不到在计算中整合这种反向顺序的方法,因此我将按照您的建议编辑输出。
【解决方案3】:

这是 Cython 函数的改进版本。 老实说,如果 Cython 是一个选项,我会这样做。

import numpy as np
from libc.stdint cimport int64_t as i64
from cython cimport boundscheck, wraparound

@boundscheck(False)
@wraparound(False)
def all_trace_int64(i64[:,::1] A):
    cdef:
        int i,j
        i64[:] t = np.zeros(A.shape[0] + A.shape[1] - 1, dtype=np.int64)
    for i in range(A.shape[0]):
        for j in range(A.shape[1]):
            t[A.shape[0]-i+j-1] += A[i,j]
    return np.array(t)

这将比您在问题中提供的版本快得多,因为它会按照存储在内存中的顺序遍历数组。 对于小型阵列,这两种方法几乎相同,尽管这一种在我的机器上稍微快一些。

我编写了这个函数,因此它需要一个 C 连续数组。 如果您有一个 Fortran 连续数组,请将其转置,然后反转输出的顺序。

这确实以与示例中显示的函数相反的顺序返回答案,因此如果顺序特别重要,您将需要反转数组的顺序。

您还可以通过更重的优化来提高性能。 例如,您可以在 IPython 笔记本中构建您的 Cython 代码,并使用额外的编译器标志替换

%%cython

类似的东西

%%cython -c=-O3 -c=-march=native -c=-funroll-loops -f

编辑: 执行此操作时,您还需要确保您的值不是由外部产品生成的。如果您的值来自外部产品,则可以将此操作与外部产品组合到对 np.convolve 的单个调用中。

【讨论】:

  • 这个方法是所有提议中最快的。不过,我真的很感谢每个人的努力——我学到了很多我以前不知道的关于 numpy 的知识(尤其是 bincount 和 strides)。感谢大家的帮助!
【解决方案4】:

如果数组很大,这是有竞争力的:

def f5(A):
    rows, cols = A.shape
    N = rows + cols -1
    out = np.zeros(N, A.dtype)
    for idx in range(rows):
        out[N-idx-cols:N-idx] += A[idx]
    return out[::-1]

虽然它使用 Python 循环,但它比 bincount 解决方案更快(对于大型数组......在我的系统上......)


这种方法对数组列/行比率确实有很高的敏感性,因为这个比率决定了 Python 相对于 Numpy 执行了多少循环。 正如@Jaime 指出的那样,迭代最小维度是有效的,例如:

def f6(A):
    rows, cols = A.shape
    N = rows + cols -1
    out = np.zeros(N, A.dtype)

    if rows > cols:
        for idx in range(cols):
            out[N-idx-rows:N-idx] += A[:, idx]
    else:
        for idx in range(rows):
            out[N-idx-cols:N-idx] += A[idx]
        out = out[::-1]
    return out

但是应该注意的是,对于较大的数组大小(例如我的系统上的100000 x 500),像我发布的第一个代码那样逐行访问数组仍然可能更快,可能是因为数组的布局方式内存 (获取连续块比分散位更快)。

【讨论】:

  • 是的,我可以看到当矩阵非常宽时这会更好,但它主要取决于列/行比率,越高越好。
  • @eickenberg;这当然取决于 col/rows 比率,但它并不是 至关重要的。在我的测试中,它甚至比@Jaime 方法更快A = np.random.rand(100000, 500)
  • 有趣。那应该是不同的效果:@Jaime 构建了一个大型索引数组,您可以使用您的方法避免该数组。因此,当数据向任何方向增加时,他的方法应该会变得更糟。而你的 cols * lines == constant 应该在 lines &gt;&gt; cols 时做得更好
  • 如果你总是迭代最小的维度,并在最大的维度上进行矢量化总和,它应该会表现得更好,不是吗?
  • @Jaime,对于较小的矩阵确实如此。从一定大小开始,访问列中不连续的数据似乎花费太多,并且再次变得更慢。切换发生的数组大小取决于机器。
【解决方案5】:

这可以通过(稍微粗暴地)以两种方式使用scipy.sparse.dia_matrix 来完成,一种比另一种更稀疏。

第一个产生精确结果,使用 dia_matrix 存储的数据向量

import numpy as np
from scipy.sparse import dia_matrix
A = np.arange(30).reshape(3, 10)
traces = dia_matrix(A).data.sum(1)[::-1]

另一种占用内存较少的方法是反过来:

import numpy as np
from scipy.sparse import dia_matrix
A = np.arange(30).reshape(3, 10)
A_dia = dia_matrix((A, range(len(A))), shape=(A.shape[1],) * 2)
traces = np.array(A_dia.sum(1)).ravel()[::-1]

但请注意,此解决方案中缺少两个条目。这可能会以一种聪明的方式纠正,但我还不确定。


@moarningsun 找到了解决方案:

rows, cols = A.shape

A_dia = dia_matrix((A, np.arange(rows)), shape=(cols,)*2)
traces1 = A_dia.sum(1).A.ravel()

A_dia = dia_matrix((A, np.arange(-rows+1, 1)), shape=(rows,)*2)
traces2 = A_dia.sum(1).A.ravel()

traces = np.concatenate((traces1[::-1], traces2[-2::-1]))

【讨论】:

  • 有趣的是,它归结为“找到一个满足你需要的 C 扩展,不管扩展的原始目的是什么”:P 我想知道这与(几乎最佳的)Cython 实现的性能有多接近.
  • 你说得对,基本上就是这样。其中一些甚至可以是优雅的(虽然不是试图评判这个)。如果这样做,它应该非常有用,并且在速度/内存等方面具有足够的优势,以证明其晦涩难懂。在这种情况下,我承认我已经开始从更多代码高尔夫的角度来看这个线程:)
【解决方案6】:

np.trace 做你想做的事:

import numpy as np

A = array([[ 0,  1,  2,  3],
           [ 4,  5,  6,  7],
           [ 8,  9, 10, 11]])

n = A.shape[0]
[np.trace(A, i) for i in range(-n+1, n+1)]

编辑:根据@user2357112的建议,将np.sum(np.diag())改为np.trace()

【讨论】:

  • 如果您要使用列表解析,np.trace 已经做得更好了。关键是要避免 Python 级别的循环和推导,因为它们比你想要的这种工作慢得多。
【解决方案7】:

使用numpy数组trace方法:

import numpy as np
A = np.array([[ 0,  1,  2,  3],
           [ 4,  5,  6,  7],
           [ 8,  9, 10, 11]])
A.trace()

返回:

15

【讨论】:

    猜你喜欢
    • 2016-05-02
    • 2017-12-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-12
    • 2014-07-17
    • 1970-01-01
    相关资源
    最近更新 更多