【问题标题】:numpy - efficiently copy values from matrix to matrix using some precalculated mapnumpy - 使用一些预先计算的映射有效地将值从矩阵复制到矩阵
【发布时间】:2018-12-16 11:05:03
【问题描述】:

我有一个大小为 I*J 的输入矩阵 A

还有一个大小为 N*M 的输出矩阵 B

还有一些预先计算好的大小为 N*M*2 的地图,它规定了 B 中的每个坐标,以及要采用 A 中的哪个坐标。该地图没有我可以使用的特定规则或线性。只是一张看似随机的地图。

矩阵非常大(~5000*~3000),所以创建映射矩阵是不可能的(5000*3000*5000*3000)

我设法使用简单的地图和循环来做到这一点:

for i in range(N):
    for j in range(M):
        B[i, j] = A[mapping[i, j, 0], mapping[i, j, 1]]

我设法使用索引来做到这一点:

B[coords_y, coords_x] = A[some_mapping[:, 0], some_mapping[:, 1]]
# Where coords_x, coords_y are defined as all of the coordinates:
# [[0,0],[0,1]..[0,M-1],[1,0],[1,1]...[N-1,M-1]]

这效果要好得多,但还是有点慢。

我有无限的时间提前计算映射或任何其他实用程序计算。但是在这些预先计算之后,这种映射应该尽可能快地发生。

目前,我看到的唯一其他选择就是用 C 或更快的方式重新实现它......

(如果有人好奇,我会用其他一些不同形状和方向的图像创建一个图像,并进行一些编码。但它的映射非常复杂,而不是简单或线性的东西使用)

【问题讨论】:

  • 你试过了吗:B = A[mapping[:,:, 0], mapping[:,:,1]]
  • 如何选择这个计算的输出形状?
  • 这不会自动完成,因为mappingB 在它们的前两个轴上具有相同的形状?仅供参考:我正在关注您的代码:B[i, j] = A[mapping[i, j, 0], mapping[i, j, 1]].
  • 你是对的。它奏效了,给了我近 50% 的改进。谢谢!

标签: python python-3.x numpy matrix optimization


【解决方案1】:

一种选择是使用numba,它通常可以为这种简单的算法代码提供实质性的改进。

import numpy as np
from numba import njit

I, J = 5000, 5000
N, M = 3000, 3000

A = np.random.randint(0, 10, [I, J])
B = np.random.randint(0, 10, [N, M])
mapping = np.dstack([np.random.randint(0, I - 1, (N, M)),
                     np.random.randint(0, J - 1, (N, M))])

B0 = B.copy()

def orig(A, B, mapping):
    for i in range(N):
        for j in range(M):
            B[i, j] = A[mapping[i, j, 0], mapping[i, j, 1]]

new = njit(orig)

这给了我们匹配的结果:

In [313]: Bold = B0.copy()

In [314]: orig(A, Bold, mapping)

In [315]: Bnew = B0.copy()

In [316]: new(A, Bnew, mapping)

In [317]: (Bold == Bnew).all()
Out[317]: True

而且速度更快:

In [320]: %time orig(A, B0.copy(), mapping)
Wall time: 6.11 s

In [321]: %time new(A, B0.copy(), mapping)
Wall time: 257 ms

在第一次调用之后,当它必须做它的 jit 工作时,速度更快:

In [322]: %time new(A, B0.copy(), mapping)
Wall time: 171 ms

In [323]: %time new(A, B0.copy(), mapping)
Wall time: 163 ms

添加两行代码的性能提高了 30 倍。

【讨论】:

    【解决方案2】:

    解决这些类型的性能关键问题的一个非常好的解决方案是保持简单并使用其中一种高性能软件包。最简单的可能是Numba,它提供了jit 装饰器,可以将数组和循环繁重的代码编译到优化的LLVM。下面是一个完整的例子:

    from time import time
    import numpy as np
    from numba import jit
    
    # Function doing the computation
    def normal(A, B, mapping):
        N, M = B.shape
        for i in range(N):
            for j in range(M):
                B[i, j] = A[mapping[i, j, 0], mapping[i, j, 1]]
        return B
    
    # The same exact function, but with the Numba jit decorator
    @jit
    def fast(A, B, mapping):
        N, M = B.shape
        for i in range(N):
            for j in range(M):
                B[i, j] = A[mapping[i, j, 0], mapping[i, j, 1]]
        return B
    
    # Create sample data
    def create_sample_data(I, J, N, M):
        A = np.random.random((I, J))
        B = np.empty((N, M))
        mapping = np.asarray(np.stack((
            np.random.random((N, M))*I,
            np.random.random((N, M))*J,
            ), axis=2), dtype=int)
        return A, B, mapping
    A, B, mapping = create_sample_data(500, 600, 700, 800)
    
    # Run normally
    t0 = time()
    B = normal(A, B, mapping)
    t1 = time()
    print('normal took', t1 - t0, 'seconds')
    
    # Run using Numba.
    # First we should run the function with smaller arrays,
    # just to compile the code.
    fast(*create_sample_data(5, 6, 7, 8))
    # Now, run with real data
    t0 = time()
    B = fast(A, B, mapping)
    t1 = time()
    print('fast took', t1 - t0, 'seconds')
    

    这使用您自己的循环解决方案,使用标准 Python 本身就很慢,但使用 Numba 时与 C 一样快。在我的机器上,normal 函数在 0.270 秒内执行,而fast 函数在 0.00248 秒内执行。也就是说,Numba 几乎免费为我们提供了 109 倍的加速 (!)。

    请注意,fast Numba 函数被调用了两次,第一次使用小输入数组,然后才使用真实数据。这是一个经常被忽视的关键步骤。没有它,您会发现性能提升不如第一次调用用于编译代码那么好。在这个初始调用中,输入数组的类型和维度应该相同,但每个维度的大小并不重要。

    我在函数外部创建 B 并将其作为参数传递(“填充值”)。你也可以在函数内部分配B,Numba 不在乎。

    获取 Numba 的最简单方法是正确地通过 Anaconda 分发。

    【讨论】:

      【解决方案3】:

      如果您有无限时间进行预计算,则可以通过使用平面索引来稍微加快速度:

      map_f = np.ravel_multi_index((*np.moveaxis(mapping, 2, 0),), A.shape)
      

      然后简单地做:

      A.ravel()[map_f]
      

      请注意,这个加速是在我们从花哨的索引中获得的巨大加速之上的。例如:

      >>> A = np.random.random((5000, 3000))
      >>> mapping = np.random.randint(0, 15000, (5000, 3000, 2)) % [5000, 3000]
      >>> 
      >>> map_f = np.ravel_multi_index((*np.moveaxis(mapping, 2, 0),), A.shape)
      >>> 
      >>> np.all(A.ravel()[map_f] == A[mapping[..., 0], mapping[..., 1]])
      True
      >>> 
      >>> timeit('A[mapping[:, :, 0], mappping[:, :, 1]]', globals=globals(), number=10)
      4.101239089999581
      >>> timeit('A.ravel()[map_f]', globals=globals(), number=10)
      2.7831342950012186
      

      如果我们要与原始的循环代码进行比较,加速会更像是 ~40 倍。

      最后,请注意,这个解决方案不仅避免了额外的依赖和潜在的安装噩梦,而且更简单、更短、更快:

      numba:
      precomp:    132.957 ms
      main        238.359 ms
      
      flat indexing:
      precomp:     76.223 ms
      main:       219.910 ms
      

      代码:

      import numpy as np
      from numba import jit
      
      @jit
      def fast(A, B, mapping):
          N, M = B.shape
          for i in range(N):
              for j in range(M):
                  B[i, j] = A[mapping[i, j, 0], mapping[i, j, 1]]
          return B
      
      from timeit import timeit
      
      A = np.random.random((5000, 3000))
      mapping = np.random.randint(0, 15000, (5000, 3000, 2)) % [5000, 3000]
      
      a = np.random.random((5, 3))
      m = np.random.randint(0, 15, (5, 3, 2)) % [5, 3]
      
      print('numba:')
      print(f"precomp: {timeit('b = fast(a, np.empty_like(a), m)', globals=globals(), number=1)*1000:10.3f} ms")
      print(f"main     {timeit('B = fast(A, np.empty_like(A), mapping)', globals=globals(), number=10)*100:10.3f} ms")
      
      print('\nflat indexing:')
      print(f"precomp: {timeit('map_f = np.ravel_multi_index((*np.moveaxis(mapping, 2, 0),), A.shape)', globals=globals(), number=10)*100:10.3f} ms")
      map_f = np.ravel_multi_index((*np.moveaxis(mapping, 2, 0),), A.shape)
      print(f"main:    {timeit('B = A.ravel()[map_f]', globals=globals(), number=10)*100:10.3f} ms")
      

      【讨论】:

        【解决方案4】:

        您可以做的最直接的优化是放弃本机 python 循环并使用花哨的 numpy 索引。你已经有数组可以做到这一点:

        import numpy as np
        
        A = np.random.rand(2000,3000)
        B = np.empty((2500,3500)) # just for shape, really
        
        # this is the same as your original, but with random indices
        mapping = np.stack([np.random.randint(0, A.shape[0] - 1, B.shape),
                            np.random.randint(0, A.shape[1] - 1, B.shape)],
                           axis=-1)
        
        # your loopy original
        def loopy(A, B, mapping):
            B = B.copy()
            for i in range(B.shape[0]):
                for j in range(B.shape[1]):
                    B[i, j] = A[mapping[i, j, 0], mapping[i, j, 1]]
            return B
        
        # vectorization with fancy indexing
        def fancy(A, mapping):
            return A[mapping[...,0], mapping[...,1]]
        

        请注意,fancy 高级索引函数不需要预先分配 B 数组,因为新数组是由索引操作构造的。

        花哨的索引版本略有不同,可能稍微更有效:将mapping的最后一个维度放在第一位,这样两个索引数组都是连续的内存块。从我的计时测试中可以看出,在上述设置中这恰好是较慢的。无论如何:

        mapping_T = mapping.transpose(2, 0, 1).copy() # but it's actually `mapping` without axis=-1 kwarg
        # has shape (2, N, M)
        
        def fancy_T(A, mapping_T):
            return A[tuple(mapping_T)]
        

        作为Paul Panzer noted in a comment,仅在mapping 上调用.transpose 不会创建副本,而是使用步幅技巧实现转置。为了得到一个连续的数组(这是优化的重点),我们需要强制创建一个副本。

        我在 ipython 中得到以下时间:

        # loopy(A, B, mapping)
        6.63 s ± 141 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
        # fancy(A, mapping)
        250 ms ± 3.79 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
        # fancy_T(A, mapping_T)
        277 ms ± 1.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
        

        说实话,我不明白为什么原始数组的顺序比转置的要快,但就是这样。

        【讨论】:

        • 由于 numpy 转置是惰性的 mapping_T不会为您提供连续索引数组,请查看 mapping_T[0].flags。您必须为此强制执行连续副本。
        • @PaulPanzer 谢谢,我一直忘记 numpy 避免复制的一些技巧。有趣的是,正确地做它会变慢。我现在无法弄清楚为什么会发生这种情况,即使我定义了子数组的单独副本并使用 A[mapping1, mapping2] 它似乎更慢。
        • 击败我。也许是因为需要一起处理的索引在内存中彼此相邻?只是一个疯狂的猜测。
        猜你喜欢
        • 2011-05-27
        • 2013-05-16
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多