【问题标题】:efficient loop over numpy arraynumpy数组上的有效循环
【发布时间】:2017-01-15 05:03:35
【问题描述】:

已经问过这个问题的不同版本,但我没有找到满意的答案。

问题:给定一个大的 numpy 向量,找到重复的向量元素的索引(其变化可以与容差进行比较)。

所以问题是〜O(N ^ 2)和内存限制(至少从当前算法的角度来看)。我想知道为什么我尝试过的 Python 比等效的 C 代码慢 100 倍或更多。

import numpy as np
N = 10000
vect = np.arange(float(N))
vect[N/2] = 1
vect[N/4] = 1
dupl = []
print("init done")
counter = 0
for i in range(N):
    for j in range(i+1, N):
        if vect[i] == vect[j]:
            dupl.append(j)
            counter += 1

print("counter =", counter)
print(dupl)
# For simplicity, this code ignores repeated indices 
# which can be trimmed later. Ref output is
# counter = 3
# [2500, 5000, 5000]

我尝试使用 numpy 迭代器,但它们更糟糕(~ x4-5) http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html

使用 N=10,000 我在 C 中得到 0.1 秒,在 Python 中得到 12 秒(上面的代码),在 Python 中使用 np.nditer 得到 40 秒,在 Python 中使用 np.ndindex 得到 50 秒。我把它推到 N=160,000 并且时间按预期缩放为 N^2。

【问题讨论】:

  • 因为 Python 很慢?
  • numpy 数组在使用内置 numpy 函数(在 C 中实现)时非常有效。无论您是否使用 numpy,Python 循环都很慢。尝试仅使用 numpy 函数来实现您的算法。使用内置 Python 函数和/或推导式还应该提高性能(低于 numpy 但高于普通循环)。
  • 因此,Python 中的循环也不错。无论如何,循环有什么困难。我怀疑它的嵌套循环正在杀死这段代码(创建另一个上下文?)
  • 我不想回答我自己的问题,但我最终通过求助于 Numba 解决了。一开始还记得,后来忘记了。时间几乎就是 C 编译代码给我的时间,而且还有一个 Python 循环。所以外循环不是问题。我感谢所有关于使用库的 cmets。实际上,必须使用它们才能获得最佳性能。但是,我发现很难记住所有这些调用,并且从代码而不是库的角度思考对我来说更容易。
  • nditer 页面以 cython 示例结尾。那是你获得了一些速度。否则nditer 只是处理多输入和输出广播的一种方式。

标签: python arrays loops numpy optimization


【解决方案1】:

方法#1

您可以使用triangular matrix 模拟矢量化解决方案的迭代器依赖条件。这是基于处理涉及iterator dependency 的乘法的this post。为了执行vect 中每个元素对其所有元素的元素相等性,我们可以使用NumPy broadcasting。最后,我们可以使用np.count_nonzero 来获取计数,因为它应该在对布尔数组求和时非常有效。

所以,我们会有这样的解决方案 -

mask = np.triu(vect[:,None] == vect,1)
counter = np.count_nonzero(mask)
dupl = np.where(mask)[1]

如果您只关心counter 的计数,我们可以提供以下两种方法。

方法 #2

我们可以避免使用三角矩阵,简单地获取整个计数,然后从对角线元素中减去贡献,并通过将剩余计数减半来仅考虑上三角区域的下一个区域,因为任何一个的贡献都会完全相同。

所以,我们会有一个修改后的解决方案,像这样 -

counter = (np.count_nonzero(vect[:,None] == vect) - vect.size)//2

方法#3

这是一种完全不同的方法,它利用每个唯一元素的计数对最终总数的累积贡献这一事实。

因此,考虑到这个想法,我们将采用第三种方法 -

count = np.bincount(vect) # OR np.unique(vect,return_counts=True)[1]
idx = count[count>1]
id_arr = np.ones(idx.sum(),dtype=int)
id_arr[0] = 0
id_arr[idx[:-1].cumsum()] = -idx[:-1]+1
counter = np.sum(id_arr.cumsum())

【讨论】:

    【解决方案2】:

    Python 本身是一种高度动态、缓慢的语言。 numpy 中的想法是使用vectorization,并避免显式循环。在这种情况下,您可以使用np.equal.outer。你可以从

    a = np.equal.outer(vect, vect)
    

    现在,例如,求和:

     >>> np.sum(a)
     10006
    

    要找到相等的 i 索引,您可以这样做

    np.fill_diagonal(a, 0)
    
    >>> np.nonzero(np.any(a, axis=0))[0]
    array([   1, 2500, 5000])
    

    时机

    def find_vec():
        a = np.equal.outer(vect, vect)
        s = np.sum(a)
        np.fill_diagonal(a, 0)
        return np.sum(a), np.nonzero(np.any(a, axis=0))[0]
    
    >>> %timeit find_vec()
    1 loops, best of 3: 214 ms per loop
    
    def find_loop():
        dupl = []
        counter = 0
        for i in range(N):
            for j in range(i+1, N):
                 if vect[i] == vect[j]:
                     dupl.append(j)
                     counter += 1
        return dupl
    
    >>> % timeit find_loop()
    1 loops, best of 3: 8.51 s per loop
    

    【讨论】:

    • 使用这种方法的加速增益是多少?
    • @vz0 相对于循环?
    • OP 在他的问题中发布了运行纯 python 版本需要 12 秒的问题。运行您的版本需要多长时间?
    • @vz0 查看编辑 - 我的循环大约需要 8.5 秒,而矢量化版本需要 214 毫秒。
    • 矢量化版本还是 O(N**2) 吗?
    【解决方案3】:

    我想知道为什么我尝试过的 Python 比等效的 C 代码慢 100 倍或更多。

    因为 Python 程序通常比 C 程序慢 100 倍。

    您可以在 C 中实现关键代码路径并提供 Python-C 绑定,或者更改算法。您可以使用 dict 将数组从值反转为索引来编写 O(N) 版本。

    import numpy as np
    N = 10000
    vect = np.arange(float(N))
    vect[N/2] = 1
    vect[N/4] = 1
    dupl = {}
    print("init done")
    counter = 0
    for i in range(N):
        e = dupl.get(vect[i], None)
        if e is None:
            dupl[vect[i]] = [i]
        else:
            e.append(i)
            counter += 1
    
    print("counter =", counter)
    print([(k, v) for k, v in dupl.items() if len(v) > 1])
    

    编辑:

    如果您需要使用 abs(vect[i] - vect[j])

    abs(vect[i] - vect[j]) < eps ->
    abs(vect[i] - vect[j]) / eps < (eps / eps) ->
    abs(vect[i]/eps - vect[j]/eps) < 1
    int(abs(vect[i]/eps - vect[j]/eps)) = 0
    

    像这样:

    import numpy as np
    N = 10000
    vect = np.arange(float(N))
    vect[N/2] = 1
    vect[N/4] = 1
    dupl = {}
    print("init done")
    counter = 0
    eps = 0.01
    for i in range(N):
        k = int(vect[i] / eps)
        e = dupl.get(k, None)
        if e is None:
            dupl[k] = [i]
        else:
            e.append(i)
            counter += 1
    
    print("counter =", counter)
    print([(k, v) for k, v in dupl.items() if len(v) > 1])
    

    【讨论】:

    • 不幸的是,字典方法对我不起作用,因为实际上我需要在一定的公差范围内比较数组元素 abs(vect[i] - vect[j])
    • 然后您可以将 dict 中的值标准化为 eps。 abs(vect[i] - vect[j]) abs(vect[i] - vect[j]) / eps abs(vect[i]/eps - vect[j]/每股收益)
    【解决方案4】:

    这个使用numpy_indexed包的解决方案复杂度为n Log n,并且是完全向量化的;所以很可能与 C 的性能没有太大区别。

    import numpy_indexed as npi
    dpl = np.flatnonzero(npi.multiplicity(vect) > 1)
    

    【讨论】:

      【解决方案5】:

      显而易见的问题是您为什么要以这种方式执行此操作。 NumPy 数组旨在成为不透明的数据结构——我的意思是 NumPy 数组旨在在 NumPy 系统内部创建,然后将操作发送到 NumPy 子系统以传递结果。即 NumPy 应该是一个黑盒子,您可以向其中抛出请求并输出结果。

      所以鉴于上面的代码,我一点也不惊讶 NumPy 的性能比可怕的还要糟糕。

      我相信,以下内容应该是您想要的,但使用 NumPy 方式完成:

      import numpy as np
      
      N = 10000
      vect = np.arange(float(N))
      vect[N/2] = 1
      vect[N/4] = 1
      
      print([np.where(a == vect)[0] for a in vect][1])
      
      # Delivers [1, 2500, 5000]
      

      【讨论】:

      • 不错的一个。我想到了 np.where 并尝试了它。问题是,我不需要答案中的 1,只需要 2500 和 5000,即重复。其次,近似相似的泛化不是那么整齐而且有点慢。但是,是的,你的一个班轮和 C 一样快。
      【解决方案6】:

      作为 Ami Tavory 答案的替代方案,您可以使用 collections 包中的 Counter 来检测重复项。在我的电脑上,它似乎更快。请参阅下面的函数,该函数也可以找到不同的重复项。

      import collections
      import numpy as np
      
      def find_duplicates_original(x):
          d = []
          for i in range(len(x)):
              for j in range(i + 1, len(x)):
                  if x[i] == x[j]:
                      d.append(j)
          return d
      
      def find_duplicates_outer(x):
          a = np.equal.outer(x, x)
          np.fill_diagonal(a, 0)
          return np.flatnonzero(np.any(a, axis=0))
      
      def find_duplicates_counter(x):
          counter = collections.Counter(x)
          values = (v for v, c in counter.items() if c > 1)
          return {v: np.flatnonzero(x == v) for v in values}
      
      
      n = 10000
      x = np.arange(float(n))
      x[n // 2] = 1
      x[n // 4] = 1
      
      >>>> find_duplicates_counter(x)
      {1.0: array([   1, 2500, 5000], dtype=int64)}
      
      >>>> %timeit find_duplicates_original(x)
      1 loop, best of 3: 12 s per loop
      
      >>>> %timeit find_duplicates_outer(x)
      10 loops, best of 3: 84.3 ms per loop
      
      >>>> %timeit find_duplicates_counter(x)
      1000 loops, best of 3: 1.63 ms per loop
      

      【讨论】:

      • 不幸的是,这与 Ami Tavory 的建议不适合。试试 N=160000
      【解决方案7】:

      与您的代码 18 秒相比,这在 8 毫秒内运行,并且不使用任何奇怪的库。它类似于@vs0 的方法,但我更喜欢defaultdict。它应该是大约 O(N)。

      from collections import defaultdict
      dupl = []
      counter = 0
      indexes = defaultdict(list)
      for i, e in enumerate(vect):
          indexes[e].append(i)
          if len(indexes[e]) > 1:
              dupl.append(i)
              counter += 1
      

      【讨论】:

        【解决方案8】:

        由于答案已经停止出现并且没有一个是完全令人满意的,为了记录,我发布了我自己的解决方案。

        我的理解是,在这种情况下,让 Python 变慢的是赋值,而不是我最初认为的嵌套循环。使用库或编译代码消除了分配的需要,性能显着提高。

        from __future__ import print_function
        import numpy as np
        from numba import jit
        
        N = 10000
        vect = np.arange(N, dtype=np.float32)
        
        vect[N/2] = 1
        vect[N/4] = 1
        dupl = np.zeros(N, dtype=np.int32)
        
        print("init done")
        # uncomment to enable compiled function
        #@jit
        def duplicates(i, counter, dupl, vect):
            eps = 0.01
            ns = len(vect)
            for j in range(i+1, ns):
                # replace if to use approx comparison
                #if abs(vect[i] - vect[j]) < eps:
                if vect[i] == vect[j]:
                    dupl[counter] = j
                    counter += 1
            return counter
        
        counter = 0
        for i in xrange(N):
            counter = duplicates(i, counter, dupl, vect)
        
        print("counter =", counter)
        print(dupl[0:counter])
        

        测试

        # no jit
        $ time python array-test-numba.py
        init done
        counter = 3
        [2500 5000 5000]
        
        elapsed 10.135 s
        
        # with jit
        $ time python array-test-numba.py
        init done
        counter = 3
        [2500 5000 5000]
        
        elapsed 0.480 s
        

        编译版本(@jit 未注释)的性能接近 C 代码性能 ~0.1 - 0.2 秒。也许消除最后一个循环可以进一步提高性能。使用 eps 进行近似比较时,性能差异甚至更大,而编译版本的差异很小。

        # no jit
        $ time python array-test-numba.py
        init done
        counter = 3
        [2500 5000 5000]
        
        elapsed 109.218 s
        
        # with jit
        $ time python array-test-numba.py
        init done
        counter = 3
        [2500 5000 5000]
        
        elapsed 0.506 s
        

        这是 ~ 200 倍的差异。在实际代码中,我必须将两个循环都放在函数中,并使用具有变量类型的函数模板,所以它有点复杂但不是很复杂。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2023-01-09
          • 1970-01-01
          • 2015-08-06
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2012-11-28
          相关资源
          最近更新 更多