【问题标题】:Cython numpy array indexer speed improvementCython numpy 数组索引器速度提升
【发布时间】:2016-10-16 16:27:38
【问题描述】:

我用纯python编写了以下代码,它的作用在文档字符串中的描述:

import numpy as np
from scipy.ndimage.measurements import find_objects
import itertools

def alt_indexer(arr):

    """
    Returns a dictionary with the elements of arr as key
    and the corresponding slice as value.

    Note:

        This function assumes arr is sorted.

    Example:

        >>> arr = [0,0,3,2,1,2,3]
        >>> loc = _indexer(arr)
        >>> loc
        {0: (slice(0L, 2L, None),),
        1: (slice(2L, 3L, None),),
        2: (slice(3L, 5L, None),),
        3: (slice(5L, 7L, None),)}
        >>> arr = sorted(arr)
        >>> arr[loc[3][0]]
        [3, 3]
        >>> arr[loc[2][0]]
        [2, 2]

    """

    unique, counts = np.unique(arr, return_counts=True)
    labels = np.arange(1,len(unique)+1)
    labels = np.repeat(labels,counts)

    slicearr = find_objects(labels)
    index_dict = dict(itertools.izip(unique,slicearr))

    return index_dict

由于我将索引非常大的数组,我想通过使用 cython 来加速操作,这是等效的实现:

import numpy as np
cimport numpy as np

def _indexer(arr):

    cdef tuple unique_counts = np.unique(arr, return_counts=True)
    cdef np.ndarray[np.int32_t,ndim=1] unique = unique_counts[0]
    cdef np.ndarray[np.int32_t,ndim=1] counts = unique_counts[1].astype(int)

    cdef int start=0
    cdef int end
    cdef int i
    cdef dict d ={}

    for i in xrange(len(counts)):
        if i>0:
            start = counts[i-1]+start
        end=counts[i]+start
        d[unique[i]]=slice(start,end)
    return d

基准测试

我比较了完成这两个操作所需的时间:

In [26]: import numpy as np

In [27]: rr=np.random.randint(0,1000,1000000)

In [28]: %timeit _indexer(rr)
10 loops, best of 3: 40.5 ms per loop

In [29]: %timeit alt_indexer(rr) #pure python
10 loops, best of 3: 51.4 ms per loop

如您所见,速度提升很小。我确实意识到,自从我使用 numpy 以来,我的代码已经部分优化了。

是否存在我不知道的瓶颈? 我不应该使用np.unique 而是编写自己的实现吗?

谢谢。

【问题讨论】:

  • cython 循环如果可以转换为纯C 会更快。在您的情况下,循环仍然使用 numpy.unique 和 Python 字典和切片对象。

标签: python performance numpy cython


【解决方案1】:

arr 具有非负数、不是很大并且有许多重复的int 数字,这是使用np.bincount 模拟与np.unique(arr, return_counts=True) 相同的行为的另一种方法-

def unique_counts(arr):
    counts = np.bincount(arr)
    mask = counts!=0
    unique = np.nonzero(mask)[0]
    return unique, counts[mask] 

运行时测试

案例#1:

In [83]: arr = np.random.randint(0,100,(1000)) # Input array

In [84]: unique, counts = np.unique(arr, return_counts=True)
    ...: unique1, counts1 = unique_counts(arr)
    ...: 

In [85]: np.allclose(unique,unique1)
Out[85]: True

In [86]: np.allclose(counts,counts1)
Out[86]: True

In [87]: %timeit np.unique(arr, return_counts=True)
10000 loops, best of 3: 53.2 µs per loop

In [88]: %timeit unique_counts(arr)
100000 loops, best of 3: 10.2 µs per loop

案例#2:

In [89]: arr = np.random.randint(0,1000,(10000)) # Input array

In [90]: %timeit np.unique(arr, return_counts=True)
1000 loops, best of 3: 713 µs per loop

In [91]: %timeit unique_counts(arr)
10000 loops, best of 3: 39.1 µs per loop

案例#3:让我们运行一个 unique 在最小到最大范围内缺少一些数字的案例,并根据 np.unique 版本验证结果作为健全性检查。在这种情况下,我们不会有很多重复的数字,因此预计性能不会更好。

In [98]: arr = np.random.randint(0,10000,(1000)) # Input array

In [99]: unique, counts = np.unique(arr, return_counts=True)
    ...: unique1, counts1 = unique_counts(arr)
    ...: 

In [100]: np.allclose(unique,unique1)
Out[100]: True

In [101]: np.allclose(counts,counts1)
Out[101]: True

In [102]: %timeit np.unique(arr, return_counts=True)
10000 loops, best of 3: 61.9 µs per loop

In [103]: %timeit unique_counts(arr)
10000 loops, best of 3: 71.8 µs per loop

【讨论】:

  • 不错的解决方案!不幸的是,整数从 1000000 开始。在某些情况下,数组将包含浮点数。
  • @snowleopard 是的,那我们应该寻找其他的优化方式。
  • 但你是对的,似乎 np.unique 需要被替换。我真正需要的是切片字典,似乎有一些冗余操作。
猜你喜欢
  • 1970-01-01
  • 2013-05-22
  • 2011-11-18
  • 2011-05-14
  • 2011-12-09
  • 2010-11-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多