【问题标题】:Cython - efficiently filtering a typed memoryviewCython - 有效过滤类型化的内存视图
【发布时间】:2018-07-19 16:33:22
【问题描述】:

此 Cython 函数返回 numpy 数组的元素中的一个随机元素,该元素在一定范围内:

cdef int search(np.ndarray[int] pool):
  cdef np.ndarray[int] limited
  limited = pool[(pool >= lower_limit) & (pool <= upper_limit)]
  return np.random.choice(limited)

这很好用。但是,此功能对我的代码的性能非常关键。类型化的 memoryview 显然比 numpy 数组快得多,但它们不能像上面一样被过滤。

如何使用类型化的内存视图编写一个与上述功能相同的函数?或者有没有其他方法可以提高函数的性能?

【问题讨论】:

  • 这个表达式:pool[(pool &gt;= lower_limit) &amp; (pool &lt;= upper_limit)] 迭代了pool 三次 的长度(实现三个单独的布尔数组),这是在索引它之前!尽量使用numexpr,避免不必要的复制。
  • 我认为这个问题是基于一个错误的前提 - 类型化的内存视图比 np.ndarray 语法稍微更现代和更通用,但 do not 给出了显着不同速度。 (如果可能的话,我想我会尝试缓存limited(如果每次都相同))
  • 如果假设limited在数组中或多或少均匀分布,并且对所选元素的分布不太挑剔,则可以在数组中选择一个随机索引并返回第一个遇到的有限元素(如果在找到有限元素之前到达末尾,则从列表的开头开始)。

标签: python performance cython typed-memory-views


【解决方案1】:

好的,让我们先让代码更通用一些,稍后我会谈到性能方面。

我一般不使用:

import numpy as np
cimport numpy as np

我个人喜欢为 cimported 包使用不同的名称,因为它有助于将 C 端和 NumPy-Python 端分开。所以对于这个答案,我将使用

import numpy as np
cimport numpy as cnp

我还将为函数设置lower_limitupper_limit 参数。也许这些是在您的情况下静态(或全局)定义的,但它使示例更加独立。因此,起点是您的代码稍作修改的版本:

cpdef int search_1(cnp.ndarray[int] pool, int lower_limit, int upper_limit):
    cdef cnp.ndarray[int] limited
    limited = pool[(pool >= lower_limit) & (pool <= upper_limit)]
    return np.random.choice(limited)

Cython 中一个非常好的功能是fused types,因此您可以轻松地将这个功能推广到不同的类型。您的方法仅适用于 32 位整数数组(至少如果您的计算机上的 int 是 32 位)。支持更多的数组类型非常容易:

ctypedef fused int_or_float:
    cnp.int32_t
    cnp.int64_t
    cnp.float32_t
    cnp.float64_t

cpdef int_or_float search_2(cnp.ndarray[int_or_float] pool, int_or_float lower_limit, int_or_float upper_limit):
    cdef cnp.ndarray[int_or_float] limited
    limited = pool[(pool >= lower_limit) & (pool <= upper_limit)]
    return np.random.choice(limited)

当然,您可以根据需要添加更多类型。优点是新版本可以在旧版本失败的地方工作:

>>> search_1(np.arange(100, dtype=np.float_), 10, 20)
ValueError: Buffer dtype mismatch, expected 'int' but got 'double'
>>> search_2(np.arange(100, dtype=np.float_), 10, 20)
19.0

现在它更通用了,让我们看看你的函数实际上做了什么:

  • 您创建一个布尔数组,其中元素高于下限
  • 您创建一个布尔数组,其中元素低于上限
  • 您通过按位和两个布尔数组创建一个布尔数组。
  • 您创建了一个新数组,其中仅包含布尔掩码为 true 的元素
  • 您只能从最后一个数组中提取一个元素

为什么要创建这么多数组?我的意思是你可以简单地计算有多少元素在限制内,取一个介于 0 和限制内元素数之间的随机整数,然后在结果数组中的该索引处取任何元素 .

cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef int_or_float search_3(cnp.ndarray[int_or_float] arr, int_or_float lower_bound, int_or_float upper_bound):
    cdef int_or_float element

    # Count the number of elements that are within the limits
    cdef Py_ssize_t num_valid = 0
    for index in range(arr.shape[0]):
        element = arr[index]
        if lower_bound <= element <= upper_bound:
            num_valid += 1

    # Take a random index
    cdef Py_ssize_t random_index = np.random.randint(0, num_valid)

    # Go through the array again and take the element at the random index that
    # is within the bounds
    cdef Py_ssize_t clamped_index = 0
    for index in range(arr.shape[0]):
        element = arr[index]
        if lower_bound <= element <= upper_bound:
            if clamped_index == random_index:
                return element
            clamped_index += 1

它不会更快,但会节省大量内存。而且因为您没有中间数组,所以您根本不需要内存视图 - 但如果您愿意,您可以将参数列表中的 cnp.ndarray[int_or_float] arr 替换为 int_or_float[:] 甚至 int_or_float[::1] arr 并在内存视图上操作(它可能不会更快,但也不会更慢)。

我通常更喜欢 numba 而不是 Cython(至少如果我正在使用它)所以让我们将它与该代码的 numba 版本进行比较:

import numba as nb
import numpy as np

@nb.njit
def search_numba(arr, lower, upper):
    num_valids = 0
    for item in arr:
        if item >= lower and item <= upper:
            num_valids += 1

    random_index = np.random.randint(0, num_valids)

    valid_index = 0
    for item in arr:
        if item >= lower and item <= upper:
            if valid_index == random_index:
                return item
            valid_index += 1

还有numexpr 变体:

import numexpr

np.random.choice(arr[numexpr.evaluate('(arr >= l) & (arr <= u)')])

好的,我们来做一个基准测试:

from simple_benchmark import benchmark, MultiArgument

arguments = {2**i: MultiArgument([np.random.randint(0, 100, size=2**i, dtype=np.int_), 5, 50]) for i in range(2, 22)}
funcs = [search_1, search_2, search_3, search_numba, search_numexpr]

b = benchmark(funcs, arguments, argument_name='array size')

因此,通过不使用中间数组,您可以快大约 5 倍,如果您使用 numba,您可以获得另一个因子 5(似乎我错过了一些可能的 Cython 优化,numba 通常快约 2 倍或像 Cython 一样快)。因此,使用 numba 解决方案可以将速度提高约 20 倍。

numexpr 在这里无法比较,主要是因为您不能在那里使用布尔数组索引。

差异将取决于数组的内容和限制。您还必须衡量应用程序的性能。


顺便说一句:如果下限和上限一般不改变,最快的解决方案是过滤一次数组,然后多次调用np.random.choice。这可能会快几个数量级

lower_limit = ...
upper_limit = ...
filtered_array = pool[(pool >= lower_limit) & (pool <= upper_limit)]

def search_cached():
    return np.random.choice(filtered_array)

%timeit search_cached()
2.05 µs ± 122 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

几乎快 1000 倍,并且根本不需要 Cython 或 numba。但这是一种特殊情况,可能对您没有用处。


如果您想自己做的话,基准设置在这里(基于 Jupyter Notebook/Lab,因此是 %-symbols):

%load_ext cython

%%cython

cimport numpy as cnp
import numpy as np

cpdef int search_1(cnp.ndarray[int] pool, int lower_limit, int upper_limit):
    cdef cnp.ndarray[int] limited
    limited = pool[(pool >= lower_limit) & (pool <= upper_limit)]
    return np.random.choice(limited)

ctypedef fused int_or_float:
    cnp.int32_t
    cnp.int64_t
    cnp.float32_t
    cnp.float64_t

cpdef int_or_float search_2(cnp.ndarray[int_or_float] pool, int_or_float lower_limit, int_or_float upper_limit):
    cdef cnp.ndarray[int_or_float] limited
    limited = pool[(pool >= lower_limit) & (pool <= upper_limit)]
    return np.random.choice(limited)

cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef int_or_float search_3(cnp.ndarray[int_or_float] arr, int_or_float lower_bound, int_or_float upper_bound):
    cdef int_or_float element
    cdef Py_ssize_t num_valid = 0
    for index in range(arr.shape[0]):
        element = arr[index]
        if lower_bound <= element <= upper_bound:
            num_valid += 1

    cdef Py_ssize_t random_index = np.random.randint(0, num_valid)

    cdef Py_ssize_t clamped_index = 0
    for index in range(arr.shape[0]):
        element = arr[index]
        if lower_bound <= element <= upper_bound:
            if clamped_index == random_index:
                return element
            clamped_index += 1

import numexpr
import numba as nb
import numpy as np

def search_numexpr(arr, l, u):
    return np.random.choice(arr[numexpr.evaluate('(arr >= l) & (arr <= u)')])

@nb.njit
def search_numba(arr, lower, upper):
    num_valids = 0
    for item in arr:
        if item >= lower and item <= upper:
            num_valids += 1

    random_index = np.random.randint(0, num_valids)

    valid_index = 0
    for item in arr:
        if item >= lower and item <= upper:
            if valid_index == random_index:
                return item
            valid_index += 1

from simple_benchmark import benchmark, MultiArgument

arguments = {2**i: MultiArgument([np.random.randint(0, 100, size=2**i, dtype=np.int_), 5, 50]) for i in range(2, 22)}
funcs = [search_1, search_2, search_3, search_numba, search_numexpr]

b = benchmark(funcs, arguments, argument_name='array size')

%matplotlib widget

import matplotlib.pyplot as plt

plt.style.use('ggplot')
b.plot()

【讨论】:

  • 我认为,根据“有限元素/所有元素”的比率,将找到的有限元素保存在列表中并从列表中选择一个而不需要再次搜索可能是一个优势。跨度>
  • @ead 实际上我最初有这样的方法,但至少在基准测试中它比较慢(即使是好的比率)。这可能是因为整个函数本质上是带宽受限的,因此使用中间阵列可能会严重损害性能。而且它总是需要一个中间数组(更多内存)。您认为我应该将其包含在答案中吗?我认为这会分散注意力,但您的评论让我意识到,提及废弃的方法可能很有用。
  • 至少我很惊讶,因为使用您的方法,您必须传输数据 1.5 次(平均)。保存结果后,您只需传输 1+ratio 倍的数据。
  • 感谢您的详细解答。除非出现其他答案,否则我会在几天内奖励你赏金。我不知道 Numba 可以表现得这么好。我曾尝试循环遍历数组但结果不佳,我没想过只存储匹配数而不是匹配索引,如果我理解正确,您的解决方案会更快,因为它不需要在循环时修改任何数组。
  • @ead 我猜如果比率非常低,“条件”(lower_bound &lt;= element &lt;= upper_bound) 的分支预测器几乎总是正确的,这意味着两个循环的开销都非常低(第二个阵列会增加带宽和操作的开销)。但这只是猜测。我已经注意到但可能应该更多强调的一件事是基准可能不具有代表性。限制、数组内容和抽取的随机数会显着影响性能。
猜你喜欢
  • 2018-11-14
  • 2019-05-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-07-15
相关资源
最近更新 更多