【问题标题】:What is the fastest way to compare patches of an array?比较数组补丁的最快方法是什么?
【发布时间】:2015-09-01 03:02:05
【问题描述】:

我想将二维数组 $A$ 的不同区域与较小尺寸的给定数组 $b$ 进行比较。因为我必须做很多次,所以必须非常快速地执行此操作。我有一个运行良好的解决方案,但我希望它可以做得更好更快。

详细说明:

假设我们有一个大数组和一个小数组。我遍历大数组中与小数组大小相同的所有可能的“补丁”,并将这些补丁与给定的小数组进行比较。

def get_best_fit(big_array, small_array):

    # we assume the small array is square
    patch_size = small_array.shape[0]
    min_value = np.inf
    for x in range(patch_size, big_array.shape[0] - patch_size):
        for y in range(patch_size, big_array.shape[1] - patch_size):
            p = get_patch_term(x, y, patch_size, big_array)
            tmp = some_metric(p, small_array)
            if min_value > tmp:
                min_value = tmp
                min_patch = p

    return min_patch, min_value

为了获得补丁,我得到了这个直接数组访问实现:

def get_patch_term(x, y, patch_size, data):
    """
    a patch has the size (patch_size)^^2
    """
    patch = data[(x - (patch_size-1)/2): (x + (patch_size-1)/2 + 1),
                 (y - (patch_size-1)/2): (y + (patch_size-1)/2 + 1)]
    return patch

我想这是最关键的任务,可以更快地执行,但我不确定。

我查看了 Cython,但也许我做错了,我对它不是很熟悉。

我的第一次尝试是直接翻译成 cython:

def get_patch_term_fast(Py_ssize_t x_i, Py_ssize_t y_i,
                        Py_ssize_t patch_size,
                        np.ndarray[DTYPE_t, ndim=2] big_array):

    assert big_array.dtype == DTYPE
    patch_size = (patch_size - 1)/2

    cdef np.ndarray[DTYPE_t, ndim=2] patch = <np.ndarray[DTYPE_t, ndim=2]>big_array[(x_i - patch_size):(x_i + patch_size + 1), (y_i - patch_size): (y_i + patch_size + 1)]
    return patch

这似乎更快(见下文),但我认为并行方法应该更好,所以我想出了这个

def get_patch_term_fast_parallel(Py_ssize_t x_i, Py_ssize_t y_i,
                                 Py_ssize_t patch_size,
                                 np.ndarray[DTYPE_t, ndim=2] big_array):

    assert big_array.dtype == DTYPE
    patch_size = (patch_size - 1)/2

    assert big_array.dtype == DTYPE
    cdef Py_ssize_t x
    cdef Py_ssize_t y


    cdef np.ndarray[DTYPE_t, ndim=1] patch = np.empty(np.power((2 * patch_size) + 1, 2))
    with nogil, parallel():
        for x in prange(x_i - patch_size, x_i + patch_size + 1):
            for y in prange(y_i - patch_size, y_i + patch_size + 1):
                patch[((x - (x_i - patch_size)) * (2 * patch_size + 1)) + (y - (y_i - patch_size))] = big_array[x, y]
    #cdef np.ndarray[DTYPE_t, ndim=2] patch = <np.ndarray[DTYPE_t, ndim=2]>big_array[(x_i - patch_size):(x_i + patch_size + 1), (y_i - patch_size): (y_i + patch_size + 1)]
    return patch

不幸的是,这并没有更快。对于我使用的测试:

A = np.array(range(1200), dtype=np.float).reshape(30, 40)
b = np.array([41, 42, 81, 84]).reshape(2, 2)

x = 7
y = 7
print(timeit.timeit(lambda:get_patch_term_fast(x, y, 3, A), number=300))
print(timeit.timeit(lambda:get_patch_term_fast_parallel(x, y, 3, A).reshape(3,3), number=300))
print(timeit.timeit(lambda:get_patch_term(x, y, 3, A), number=300))

这给了

0.0008792859734967351
0.0029909340664744377
0.0029337930027395487

所以,我的第一个问题是,是否可以更快地做到这一点?第二个问题是,为什么并行方法不比原来的 numpy 实现快?

编辑:

我尝试进一步并行化 get_best_fit 函数,但不幸的是,我收到很多错误,指出我无法在没有 gil 的情况下分配 Python 对象。

代码如下:

def get_best_fit_fast(np.ndarray[DTYPE_t, ndim=2] big_array,
                      np.ndarray[DTYPE_t, ndim=2] small_array):

    # we assume the small array is square
    cdef Py_ssize_t patch_size = small_array.shape[0]

    cdef Py_ssize_t x
    cdef Py_ssize_t y

    cdef Py_ssize_t x_range = big_array.shape[0] - patch_size
    cdef Py_ssize_t y_range = big_array.shape[1] - patch_size

    cdef np.ndarray[DTYPE_t, ndim=2] p
    cdef np.ndarray[DTYPE_t, ndim=2] weights = np.empty((x_range - patch_size)*(y_range - patch_size)).reshape((x_range - patch_size), (y_range - patch_size))

    with nogil, parallel():
        for x in prange(patch_size, x_range):
            for y in prange(patch_size, y_range):
                p = get_patch_term_fast(x, y, patch_size, big_array)
                weights[x - patch_size, y - patch_size] = np.linalg.norm(np.abs(p - small_array))

    return np.min(weights)

PS:我省略了返回最小补丁的部分...

【问题讨论】:

  • 取决于some_metric 互相关(或类似的,取决于度量)使用快速傅里叶变换可能会更快。
  • 无论如何,在 numpy 中进行简单切片确实非常有效(它甚至不涉及复制任何内容),因此您不太可能在 Cython 中击败它。将 Cython 应用于 get_best_fit 中的循环可能会更幸运。
  • 不幸的是,get_best_fit 函数到 Cython 的一对一翻译没有速度优势。而且我没有让它并行工作。尽管理论上它应该可以工作,但并行循环中的对象分配给我带来了麻烦。
  • 对于并行循环,您可以尝试将min_valuemin_patch 制作为长度由线程数指定的numpy 数组,然后在循环内分配给索引cython.parallel.threadid()。 (然后你必须在循环之后选择最好的)。除非您的分配问题与其他问题有关...
  • @DavidW BTW,np.linalg.norm 只是相应 BLAS 代码的包装器,因此您可以使用 nogil 在纯 C 中调用它。为方便起见,Cython API for BLAS and LAPACK 将在 scipy 的下一个版本中提供。但我同意,在这种情况下,让所有这些都成为 nog​​il 需要付出一些努力,而且可能不值得。

标签: python arrays performance numpy cython


【解决方案1】:

并行函数的时间测量的另一个问题是,在运行并行函数后,您在数组对象上调用 reshape。可能是并行函数更快,但是 reshape 会增加额外的时间(虽然reshape 应该很快,但我不确定有多快)。

另一个问题是我们不知道您的 some_metric 术语是什么,而且您似乎没有同时使用它。我看到您的并行代码工作的方式是您并行获取补丁,然后将它们放入队列中以由 some_metric 函数分析,因此破坏了代码并行化的目的。

正如 John Greenhall 所说,使用 FFT 和卷积可以非常快。但是,要利用并行处理,您仍然需要并行分析补丁和小阵列。

数组有多大?这里的内存也是一个问题吗?

【讨论】:

    【解决方案2】:

    我认为取决于您的 some_metric 函数的作用,可能已经有一个高度优化的实现。例如,如果它只是一个卷积,那么看看Theano,它甚至可以让你很容易地利用 GPU。即使您的函数不像简单的卷积那么简单,您也可能会在 Theano 中使用构建块,而不是尝试使用 Cython 进行非常低级的操作。

    【讨论】:

    • 谢谢。会看看theano。
    【解决方案3】:

    最初根据模式匹配发布另一个答案(被标题带走),发布另一个答案

    使用一个循环遍历两个数组(大小)并应用部分相关度量(或其他),而无需一直对列表进行切片:

    作为旁注,观察一个事实(当然取决于度量),一旦一个补丁完成,下一个补丁(向下一行或右侧一列)与前一个补丁共享很多,只有它的初始和最终行(或列)发生变化,因此可以通过减去前一行并添加新行(反之亦然)从先前的相关性中更快地计算出新的相关性。由于没有给出度量函数,因此添加了观察。

    def get_best_fit(big_array, small_array):
    
        # we assume the small array is square
        patch_size = small_array.shape[0]
        x = 0 
        y = 0
        x2 = 0 
        y2 = 0
        W = big_array.shape[0]
        H = big_array.shape[1]
        W2 = patch_size
        H2 = patch_size
        min_value = np.inf
        tmp = 0
        min_patch = None
        start = 0 
        end = H*W
        start2 = 0
        end2 = W2*H2
        while start < end:
    
            tmp = 0
            start2 = 0
            x2 = 0 
            y2 = 0
            valid = True
            while start2 < end2:
                if x+x2 >= W or y+y2 >= H: 
                    valid = False
                    break
                # !!compute partial metric!!
                # no need to slice lists all the time
                tmp += some_metric_partial(x, y, x2, y2, big_array, small_array)
                x2 += 1 
                if x2>=W2: 
                    x2 = 0 
                    y2 += 1
                start2 += 1
    
            # one patch covered
            # try next patch
            if valid and min_value > tmp:
                min_value = tmp
                min_patch = [x, y, W2, H2]
    
            x += 1 
            if x>=W: 
                x = 0 
                y += 1
    
            start += 1
    
        return min_patch, min_value
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-29
      • 2011-05-08
      • 1970-01-01
      • 2018-04-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多