【问题标题】:How to vectorize a 2 level loop in NumPy如何在 NumPy 中矢量化 2 级循环
【发布时间】:2021-11-22 06:37:47
【问题描述】:

根据cmets,我修改了例子:

考虑下面的代码

import numpy as np

def subspace_angle(A, B):
    M = A.T @ B
    s = np.linalg.svd(M, compute_uv=False)
    return s[0]

def principal_angles(bases):    
    k = bases.shape[0]
    r = np.zeros((k, k))
    for i in range(k):
        x = bases[i]
        r[i, i] = subspace_angle(x, x)
        for j in range(i):
            y = bases[j]
            r[i, j] = subspace_angle(x, y)
            r[j, i] = r[i, j]
    r = np.minimum(1, r)
    return np.rad2deg(np.arccos(r))

以下是使用示例:

bases = []
# number of subspaces
k = 5
# ambient dimension
n = 8
# subspace dimension
m = 4
for i in range(5):
    X = np.random.randn(n, m)
    Q,R = np.linalg.qr(X)
    bases.append(Q)
# combine the orthonormal bases for all the subspaces
bases = np.array(bases)
# Compute the smallest principal angles between each pair of subspaces.
print(np.round(principal_angles(bases), 2))

有没有办法避免principal_angles函数中的两级for循环,从而加快代码速度? 由于此代码,矩阵 r 是对称的。由于 subspace_angle 的计算量可能很大,具体取决于数组大小,因此请务必避免为 r[i,j]r[j,i] 计算两次。

关于 JIT 的评论,实际上,我正在使用 Google/JAX 编写代码。两级循环确实可以编译 JIT,从而带来性能优势。但是,JIT 编译时间相当长(可能是由于两级 for 循环)。我想知道是否有更好的方法来编写此代码,以便它可以更快地编译。

【问题讨论】:

  • 重要的问题是,你将在 func 中执行什么样的计算?可以矢量化吗?
  • 这将是一些基于 NumPy 的计算。是的,应该可以对其进行矢量化。我想到的特定函数是 A,B,计算 M = A.T @ B,然后返回 M 的最大奇异值。我想可以向量化。我在上面的代码中保留了一个二维数组。但我的实际用例是一个矩阵数组。
  • 在我看来,我正在寻找某种方法来形成 n(n-1)/2 对 A 的行,在每对上运行 func,然后将结果重新格式化为三角矩阵,然后最后以某种方式对称三角矩阵。想知道这样的机制是否存在。
  • 请提供一个更好的例子(输入和预期输出)
  • 需要澄清的一点:python 定义的函数,包括那些调用 numpy 例程的函数,不能单独由 numpy 向量化。函数 func 将由 CPython 以每个元素的方式运行。 Numpy 无法即时编译。您可以使用 Numba numba.pydata.org 实现显着的加速 - 这是 Python/Numpy 的 JIT - 如果您愿意,也可以使用 Cython。

标签: python performance numpy vectorization


【解决方案1】:

我开始将您的代码复制到 ipython 会话,得到一个 (5,8,4) 形状的 bases。但后来意识到func 是未定义的。因此,通过将其注释掉,我得到:

In [6]: def principal_angles(bases):
   ...:     k = bases.shape[0]
   ...:     r = np.zeros((k, k))
   ...:     for i in range(k):
   ...:         x = bases[i]
   ...:         # r[i, i] = func(x, x)
   ...:         for j in range(i):
   ...:             y = bases[j]
   ...:             r[i, j] = subspace_angle(x, y)
   ...:             #r[j, i] = r[i, j]
   ...:     return r
   ...:     #r = np.minimum(1, r)
   ...:     #return np.rad2deg(np.arccos(r))
   ...: 
In [7]: r=principal_angles(bases)
In [8]: r.shape
Out[8]: (5, 5)

由于matmulsvd 都可以处理更高的维度,即批次,我想知道是否可以使用所有bases 调用subspace_angle,而不是迭代。

我们必须仔细考虑我们通过什么形状,以及它们如何演变。

def subspace_angle(A, B):
    M = A.T @ B
    s = np.linalg.svd(M, compute_uv=False)
    return s[0]

(糟糕,我的操作系统刚刚使终端崩溃,所以我稍后再讨论。)

所以AB 是 (8,4),A.T 是 (4,8),A.T@B 是 (4,4)

如果它们是 (5,8,4),A.transpose(0,2,1) 将是 (5,4,8),M 将是 (5,4,4)。

我相信 np.linalg.svd 接受 M,返回 (5,4,4)

In [29]: r=principal_angles(bases)
In [30]: r
Out[30]: 
array([[0.        , 0.        , 0.        , 0.        , 0.        ],
       [0.99902153, 0.        , 0.        , 0.        , 0.        ],
       [0.99734371, 0.95318936, 0.        , 0.        , 0.        ],
       [0.99894054, 0.99790422, 0.87577343, 0.        , 0.        ],
       [0.99840093, 0.92809283, 0.99896121, 0.98286429, 0.        ]])

让我们对整个基地进行尝试。使用广播获取第一维的“外部”产品:

In [31]: M=bases[:,None,:,:].transpose(0,1,3,2)@bases
In [32]: r1=np.linalg.svd(M, compute_uv=False)
In [33]: M.shape
Out[33]: (5, 5, 4, 4)
In [34]: r1.shape
Out[34]: (5, 5, 4)

为了匹配您的s[0],我必须使用(需要查看svd 文档):

In [35]: r1[:,:,0]
Out[35]: 
array([[1.        , 0.99902153, 0.99734371, 0.99894054, 0.99840093],
       [0.99902153, 1.        , 0.95318936, 0.99790422, 0.92809283],
       [0.99734371, 0.95318936, 1.        , 0.87577343, 0.99896121],
       [0.99894054, 0.99790422, 0.87577343, 1.        , 0.98286429],
       [0.99840093, 0.92809283, 0.99896121, 0.98286429, 1.        ]])

节省的时间不多,但如果第一个维度大于 5,可能会更好:

In [36]: timeit r=principal_angles(bases)
320 µs ± 554 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)
In [37]: %%timeit
    ...: M=bases[:,None,:,:].transpose(0,1,3,2)@bases
    ...: r1=np.linalg.svd(M, compute_uv=False)[:,:,0]
    ...: 
    ...: 
190 µs ± 450 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

这可能足以让您开始使用更精细的“矢量化”。

【讨论】:

  • 我的错。当我在 Jupyter 会话中修改示例代码时,我错误地留下了对 func() 的引用。我已经在问题中纠正了这一点。
  • 感谢您的解决方案。我用 google Jax 写了同样的东西,它按预期工作。通过避免 2 个 for 循环,JIT 编译时间从 22 秒显着减少到不到 100 毫秒。但是,经过 JIT 编译后,代码的速度比原来的要慢一些(大约 20%)。我想主要原因是我们正在进行不必要的计算。由于 r[i,j] = r[j,i],因此在循环版本中可以避免一半的计算。是否可以在矢量化版本中避免它?
  • 其实对于这个问题,已知r[i,i] = 1。所以对角元素甚至不需要计算。只需计算 r 的(严格)上三角或下三角部分。
  • 在自定义编译代码中,可以细化迭代。但是像我一样使用库存构建块,你不能。在特殊情况下,例如A.T@A matmul 可以检测到对称情况,调用更快的 BLAS 例程,但我认为这与这里无关。一般来说,在 numpy 中避免多余的计算似乎并不值得。
  • 我没有试图弄清楚为什么你似乎用s[0] 步骤丢弃了一些结果。可以以某种方式调用svd 来消除该步骤吗?
【解决方案2】:

在对np.triu_indices函数进行了更多思考和试验后,我提出了以下解决方案,避免了额外不必要的计算。

def vectorized_principal_angles(subspaces):
    # number of subspaces
    n = subspaces.shape[0]
    # Indices for upper triangular matrix
    i, j = np.triu_indices(n, k=1)
    # prepare all the possible pairs of A and B
    A = subspaces[i]
    B = subspaces[j]
    # Compute the Hermitian transpose of each matrix in A array
    AH = np.conjugate(np.transpose(A, axes=(0,2,1)))
    # Compute M = A^H B for each matrix pair
    M = np.matmul(AH, B)
    # Compute the SVD for each matrix in M
    s = np.linalg.svd(M, compute_uv=False)
    # keep only the first singular value for each M
    s = s[:, 0]
    # prepare the result matrix
    # It is known in advance that diagonal elements will be 1.
    r = 0.5 * np.eye(n)
    r[i, j] = s
    # Symmetrize the matrix
    r = r + r.T
    # Final result
    return r  

这是怎么回事:

  • np.triu_indices(k, k=1) 为我提供了 n(n-1)/2 对可能的矩阵组合的索引。
  • 所有剩余的计算仅限于 n(n-1)/2 对。
  • 最后,将标量值数组放回到一个正方形对称结果矩阵中

感谢@hpaulj 的解决方案。它帮助我找到了正确的方向。

【讨论】:

    猜你喜欢
    • 2011-02-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-10-09
    • 2016-05-26
    • 2021-04-06
    相关资源
    最近更新 更多