【发布时间】:2015-09-01 03:02:05
【问题描述】:
我想将二维数组 $A$ 的不同区域与较小尺寸的给定数组 $b$ 进行比较。因为我必须做很多次,所以必须非常快速地执行此操作。我有一个运行良好的解决方案,但我希望它可以做得更好更快。
详细说明:
假设我们有一个大数组和一个小数组。我遍历大数组中与小数组大小相同的所有可能的“补丁”,并将这些补丁与给定的小数组进行比较。
def get_best_fit(big_array, small_array):
# we assume the small array is square
patch_size = small_array.shape[0]
min_value = np.inf
for x in range(patch_size, big_array.shape[0] - patch_size):
for y in range(patch_size, big_array.shape[1] - patch_size):
p = get_patch_term(x, y, patch_size, big_array)
tmp = some_metric(p, small_array)
if min_value > tmp:
min_value = tmp
min_patch = p
return min_patch, min_value
为了获得补丁,我得到了这个直接数组访问实现:
def get_patch_term(x, y, patch_size, data):
"""
a patch has the size (patch_size)^^2
"""
patch = data[(x - (patch_size-1)/2): (x + (patch_size-1)/2 + 1),
(y - (patch_size-1)/2): (y + (patch_size-1)/2 + 1)]
return patch
我想这是最关键的任务,可以更快地执行,但我不确定。
我查看了 Cython,但也许我做错了,我对它不是很熟悉。
我的第一次尝试是直接翻译成 cython:
def get_patch_term_fast(Py_ssize_t x_i, Py_ssize_t y_i,
Py_ssize_t patch_size,
np.ndarray[DTYPE_t, ndim=2] big_array):
assert big_array.dtype == DTYPE
patch_size = (patch_size - 1)/2
cdef np.ndarray[DTYPE_t, ndim=2] patch = <np.ndarray[DTYPE_t, ndim=2]>big_array[(x_i - patch_size):(x_i + patch_size + 1), (y_i - patch_size): (y_i + patch_size + 1)]
return patch
这似乎更快(见下文),但我认为并行方法应该更好,所以我想出了这个
def get_patch_term_fast_parallel(Py_ssize_t x_i, Py_ssize_t y_i,
Py_ssize_t patch_size,
np.ndarray[DTYPE_t, ndim=2] big_array):
assert big_array.dtype == DTYPE
patch_size = (patch_size - 1)/2
assert big_array.dtype == DTYPE
cdef Py_ssize_t x
cdef Py_ssize_t y
cdef np.ndarray[DTYPE_t, ndim=1] patch = np.empty(np.power((2 * patch_size) + 1, 2))
with nogil, parallel():
for x in prange(x_i - patch_size, x_i + patch_size + 1):
for y in prange(y_i - patch_size, y_i + patch_size + 1):
patch[((x - (x_i - patch_size)) * (2 * patch_size + 1)) + (y - (y_i - patch_size))] = big_array[x, y]
#cdef np.ndarray[DTYPE_t, ndim=2] patch = <np.ndarray[DTYPE_t, ndim=2]>big_array[(x_i - patch_size):(x_i + patch_size + 1), (y_i - patch_size): (y_i + patch_size + 1)]
return patch
不幸的是,这并没有更快。对于我使用的测试:
A = np.array(range(1200), dtype=np.float).reshape(30, 40)
b = np.array([41, 42, 81, 84]).reshape(2, 2)
x = 7
y = 7
print(timeit.timeit(lambda:get_patch_term_fast(x, y, 3, A), number=300))
print(timeit.timeit(lambda:get_patch_term_fast_parallel(x, y, 3, A).reshape(3,3), number=300))
print(timeit.timeit(lambda:get_patch_term(x, y, 3, A), number=300))
这给了
0.0008792859734967351
0.0029909340664744377
0.0029337930027395487
所以,我的第一个问题是,是否可以更快地做到这一点?第二个问题是,为什么并行方法不比原来的 numpy 实现快?
编辑:
我尝试进一步并行化 get_best_fit 函数,但不幸的是,我收到很多错误,指出我无法在没有 gil 的情况下分配 Python 对象。
代码如下:
def get_best_fit_fast(np.ndarray[DTYPE_t, ndim=2] big_array,
np.ndarray[DTYPE_t, ndim=2] small_array):
# we assume the small array is square
cdef Py_ssize_t patch_size = small_array.shape[0]
cdef Py_ssize_t x
cdef Py_ssize_t y
cdef Py_ssize_t x_range = big_array.shape[0] - patch_size
cdef Py_ssize_t y_range = big_array.shape[1] - patch_size
cdef np.ndarray[DTYPE_t, ndim=2] p
cdef np.ndarray[DTYPE_t, ndim=2] weights = np.empty((x_range - patch_size)*(y_range - patch_size)).reshape((x_range - patch_size), (y_range - patch_size))
with nogil, parallel():
for x in prange(patch_size, x_range):
for y in prange(patch_size, y_range):
p = get_patch_term_fast(x, y, patch_size, big_array)
weights[x - patch_size, y - patch_size] = np.linalg.norm(np.abs(p - small_array))
return np.min(weights)
PS:我省略了返回最小补丁的部分...
【问题讨论】:
-
取决于
some_metric互相关(或类似的,取决于度量)使用快速傅里叶变换可能会更快。 -
无论如何,在 numpy 中进行简单切片确实非常有效(它甚至不涉及复制任何内容),因此您不太可能在 Cython 中击败它。将 Cython 应用于
get_best_fit中的循环可能会更幸运。 -
不幸的是,get_best_fit 函数到 Cython 的一对一翻译没有速度优势。而且我没有让它并行工作。尽管理论上它应该可以工作,但并行循环中的对象分配给我带来了麻烦。
-
对于并行循环,您可以尝试将
min_value和min_patch制作为长度由线程数指定的numpy 数组,然后在循环内分配给索引cython.parallel.threadid()。 (然后你必须在循环之后选择最好的)。除非您的分配问题与其他问题有关... -
@DavidW BTW,
np.linalg.norm只是相应 BLAS 代码的包装器,因此您可以使用nogil在纯 C 中调用它。为方便起见,Cython API for BLAS and LAPACK 将在 scipy 的下一个版本中提供。但我同意,在这种情况下,让所有这些都成为 nogil 需要付出一些努力,而且可能不值得。
标签: python arrays performance numpy cython