【问题标题】:Cython parallel loop problemsCython 并行循环问题
【发布时间】:2016-11-06 15:43:48
【问题描述】:

我正在使用 cython 计算成对距离矩阵,使用自定义指标作为scipy.spatial.distance.pdist 的更快替代方案。

我的动机

我的指标格式为

def mymetric(u,v,w):
     np.sum(w * (1 - np.abs(np.abs(u - v) / np.pi - 1))**2)

使用 scipy 的成对距离可以计算为

x = sp.spatial.distance.pdist(r, metric=lambda u, v: mymetric(u, v, w))

这里,r 是一个 m-by-n 矩阵,由维度为 nm 向量组成,w 是维度为 n 的“权重”因子。

由于我的问题m 相当高,因此计算速度非常慢。对于m = 2000n = 10,这大约需要 20 秒。

使用 Cython 的初始解决方案

我在 cython 中实现了一个计算成对距离的简单函数,并立即得到了非常有希望的结果——加速超过 500 倍。

import numpy as np
cimport numpy as np
import cython

from libc.math cimport fabs, M_PI

@cython.wraparound(False)
@cython.boundscheck(False)
def pairwise_distance(np.ndarray[np.double_t, ndim=2] r, np.ndarray[np.double_t, ndim=1] w):
    cdef int i, j, k, c, size
    cdef np.ndarray[np.double_t, ndim=1] ans
    size = r.shape[0] * (r.shape[0] - 1) / 2
    ans = np.zeros(size, dtype=r.dtype)
    c = -1
    for i in range(r.shape[0]):
        for j in range(i + 1, r.shape[0]):
            c += 1
            for k in range(r.shape[1]):
                ans[c] += w[k] * (1.0 - fabs(fabs(r[i, k] - r[j, k]) / M_PI - 1.0))**2.0

    return ans

使用 OpenMP 的问题

我想使用 OpenMP 进一步加快计算速度,但是,以下解决方案比串行版本慢了大约 3 倍。

import numpy as np
cimport numpy as np

import cython
from cython.parallel import prange, parallel

cimport openmp

from libc.math cimport fabs, M_PI

@cython.wraparound(False)
@cython.boundscheck(False)
def pairwise_distance_omp(np.ndarray[np.double_t, ndim=2] r, np.ndarray[np.double_t, ndim=1] w):
    cdef int i, j, k, c, size, m, n
    cdef np.double_t a
    cdef np.ndarray[np.double_t, ndim=1] ans
    m = r.shape[0]
    n = r.shape[1]
    size = m * (m - 1) / 2
    ans = np.zeros(size, dtype=r.dtype)
    with nogil, parallel(num_threads=8):
        for i in prange(m, schedule='dynamic'):
            for j in range(i + 1, m):
                c = i * (m - 1) - i * (i + 1) / 2 + j - 1
                for k in range(n):
                    ans[c] += w[k] * (1.0 - fabs(fabs(r[i, k] - r[j, k]) / M_PI - 1.0))**2.0

    return ans

我不知道为什么它实际上更慢,但我尝试引入以下更改。 这不仅导致性能稍差,而且结果距离ans 仅在数组的开头正确计算,其余部分为零。 通过此实现的加速可以忽略不计。

import numpy as np
cimport numpy as np

import cython
from cython.parallel import prange, parallel

cimport openmp

from libc.math cimport fabs, M_PI
from libc.stdlib cimport malloc, free

@cython.wraparound(False)
@cython.boundscheck(False)
def pairwise_distance_omp_2(np.ndarray[np.double_t, ndim=2] r, np.ndarray[np.double_t, ndim=1] w):
    cdef int k, l, c, m, n
    cdef Py_ssize_t i, j, d
    cdef size_t size
    cdef int *ci, *cj

    cdef np.ndarray[np.double_t, ndim=1, mode="c"] ans

    cdef np.ndarray[np.double_t, ndim=2, mode="c"] data
    cdef np.ndarray[np.double_t, ndim=1, mode="c"] weight

    data = np.ascontiguousarray(r, dtype=np.float64)
    weight = np.ascontiguousarray(w, dtype=np.float64)

    m = r.shape[0]
    n = r.shape[1]
    size = m * (m - 1) / 2
    ans = np.zeros(size, dtype=r.dtype)

    cj = <int*> malloc(size * sizeof(int))
    ci = <int*> malloc(size * sizeof(int))

    c = -1
    for i in range(m):
        for j in range(i + 1, m):
            c += 1
            ci[c] = i
            cj[c] = j

    with nogil, parallel(num_threads=8):
        for d in prange(size, schedule='guided'):
            for k in range(n):
                ans[d] += weight[k] * (1.0 - fabs(fabs(data[ci[d], k] - data[cj[d], k]) / M_PI - 1.0))**2.0

    return ans

对于所有功能,我使用的是以下.pyxbld 文件

def make_ext(modname, pyxfilename):
    from distutils.extension import Extension
    return Extension(name=modname,
                     sources=[pyxfilename],
                     extra_compile_args=['-O3', '-march=native', '-ffast-math', '-fopenmp'],
                     extra_link_args=['-fopenmp'],
                     )

总结

我对 cython 的经验为零,并且只了解 C 的基础知识。我会很感激任何关于可能导致这种意外行为的原因的建议,甚至是如何更好地改写我的问题。


最佳串行解决方案(比原始串行快 10 %)

@cython.cdivision(True)
@cython.wraparound(False)
@cython.boundscheck(False)
def pairwise_distance_2(np.ndarray[np.double_t, ndim=2] r, np.ndarray[np.double_t, ndim=1] w):
    cdef int i, j, k, c, size
    cdef np.ndarray[np.double_t, ndim=1] ans
    cdef np.double_t accumulator, tmp
    size = r.shape[0] * (r.shape[0] - 1) / 2
    ans = np.zeros(size, dtype=r.dtype)
    c = -1
    for i in range(r.shape[0]):
        for j in range(i + 1, r.shape[0]):
            c += 1
            accumulator = 0
            for k in range(r.shape[1]):
                tmp = (1.0 - fabs(fabs(r[i, k] - r[j, k]) / M_PI - 1.0))
                accumulator += w[k] * (tmp*tmp)
            ans[c] = accumulator

    return ans

最佳并行解决方案(比原始并行快 1%,比使用 8 个线程的最佳串行快 6 倍)

@cython.cdivision(True)
@cython.wraparound(False)
@cython.boundscheck(False)
def pairwise_distance_omp_2d(np.ndarray[np.double_t, ndim=2] r, np.ndarray[np.double_t, ndim=1] w):
    cdef int i, j, k, c, size, m, n
    cdef np.ndarray[np.double_t, ndim=1] ans
    cdef np.double_t accumulator, tmp
    m = r.shape[0]
    n = r.shape[1]
    size = m * (m - 1) / 2
    ans = np.zeros(size, dtype=r.dtype)
    with nogil, parallel(num_threads=8):
        for i in prange(m, schedule='dynamic'):
            for j in range(i + 1, m):
                c = i * (m - 1) - i * (i + 1) / 2 + j - 1
                accumulator = 0
                for k in range(n):
                    tmp = (1.0 - fabs(fabs(r[i, k] - r[j, k]) / M_PI - 1.0))
                    ans[c] += w[k] * (tmp*tmp)

    return ans

未解决的问题:

当我尝试应用答案中提出的accumulator 解决方案时,我收到以下错误:

Error compiling Cython file:
------------------------------------------------------------
...
                c = i * (m - 1) - i * (i + 1) / 2 + j - 1
                accumulator = 0
                for k in range(n):
                    tmp = (1.0 - fabs(fabs(r[i, k] - r[j, k]) / M_PI - 1.0))
                    accumulator += w[k] * (tmp*tmp)
                ans[c] = accumulator
                                   ^
------------------------------------------------------------
pdist.pyx:207:36: Cannot read reduction variable in loop body

完整代码:

@cython.cdivision(True)
@cython.wraparound(False)
@cython.boundscheck(False)
def pairwise_distance_omp(np.ndarray[np.double_t, ndim=2] r, np.ndarray[np.double_t, ndim=1] w):
    cdef int i, j, k, c, size, m, n
    cdef np.ndarray[np.double_t, ndim=1] ans
    cdef np.double_t accumulator, tmp
    m = r.shape[0]
    n = r.shape[1]
    size = m * (m - 1) / 2
    ans = np.zeros(size, dtype=r.dtype)
    with nogil, parallel(num_threads=8):
        for i in prange(m, schedule='dynamic'):
            for j in range(i + 1, m):
                c = i * (m - 1) - i * (i + 1) / 2 + j - 1
                accumulator = 0
                for k in range(n):
                    tmp = (1.0 - fabs(fabs(r[i, k] - r[j, k]) / M_PI - 1.0))
                    accumulator += w[k] * (tmp*tmp)
                ans[c] = accumulator

    return ans

【问题讨论】:

  • 更新:发现我的代码有错误,目前的问题只是并行代码的性能问题。
  • Ondrian - 当我尝试使用累加器对矩阵列进行简单求和时,我得到相同的“无法读取循环体中的归约变量”错误 - 导致此错误的错误是什么?
  • @aph 如果我没记错的话,问题是使用accumulator += something 语法而不是accumulator = accumulator + something
  • 谢谢@Ondrian - 这正是我的问题!

标签: python performance openmp cython


【解决方案1】:

我自己没有计时,所以这可能不会有太大帮助,但是:

如果您运行 cython -a 来获取您最初尝试的注释版本 (pairwise_distance_omp),您会发现 ans[c] += ... 行是黄色的,这表明它有 Python 开销。查看与该行对应的 C 表明它正在检查除以零。它的一个关键部分开始了:

if (unlikely(M_PI == 0)) {

你知道这永远不会是真的(无论如何你可能会接受 NaN 值,而不是一个例外)。您可以通过向函数添加以下额外装饰器来避免此检查:

@cython.cdivision(True)
# other decorators
def pairwise_distance_omp # etc...

这减少了相当多的 C 代码,包括必须在单个线程中运行的位。另一方面是大多数代码永远不应该运行,编译器应该能够解决这个问题,所以不清楚会有多大的不同。


第二个建议:

# at the top
cdef np.double_t accumulator, tmp

    # further down later in the loop:
    c = i * (m - 1) - i * (i + 1) / 2 + j - 1
    accumulator = 0
    for k in range(r.shape[1]):
        tmp = (1.0 - fabs(fabs(r[i, k] - r[j, k]) / M_PI - 1.0))
        accumulator = accumulator + w[k] * (tmp*tmp)
    ans[c] = accumulator

希望这有两个优点:1) tmp*tmp 应该比浮点指数的 2 次幂更快。2) 你避免从 ans 数组中读取,这可能有点慢,因为编译器总是必须小心其他线程没有改变它(即使你知道它不应该改变它)。

【讨论】:

  • 这当然有帮助,谢谢,但是问题仍然存在。两个 OpenMP 版本现在都比串行版本慢大约 2 倍(在 8 个物理内核中使用 8 个线程)。另外,我仍然不知道,为什么第二次尝试只计算部分结果。
  • @Ondrian 我添加了第二个建议,我认为这可能会有所帮助(尽管其中一项改进也可以应用于非并行版本)。我真的没有看过第二次尝试。
  • 第二条建议将序列码加速10%,真的很好。我也设法在并行代码中应用了tmp*tmp 部分,但是,accumulator 部分仍然出现错误。顺便说一句,奇怪的初始时序(串行代码比并行代码快)实际上是我测量的结果!显然time.process_time() 不是一个好方法。对此感到尴尬。 :) 我现在接受答案,但如果您知道如何处理最后一个错误,我将不胜感激。如果你不这样做,我可能会创建一个新问题。
  • accumulator = accumulator + w[k] * (tmp*tmp) 为我工作。它变得很困惑,并认为它被用作 OpenMP 缩减(例如msdn.microsoft.com/en-us/library/88b1k8y5.aspx),但以不同的方式编写它会有所帮助。我应该在真正发布之前对其进行测试......
  • 这行得通!大约比 8 线程的等效串行快 6.5 倍。非常感谢! :)
猜你喜欢
  • 1970-01-01
  • 2018-06-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-02-18
  • 1970-01-01
  • 2018-05-06
相关资源
最近更新 更多