【问题标题】:Parallelising a for loop in cython: beyond prange在 cython 中并行化 for 循环:超越 prange
【发布时间】:2018-06-03 03:16:04
【问题描述】:

我正在努力使用cython 正确并行化函数。基本上,问题是对一些数据进行分类。实际代码有点长,但最后它做了这样的事情:

def bin_var(double[:] dist,
            double[:] values,
            double[:] bin_def,
            double[:] varg, long[:] count):

    dbin = (bin_def[1] - bin_def[0]) / bin_def[2]

    for n1 in range(values.size):
            if (dist[n1] < bin_def[0]) or (dist[n1] >= bin_def[1]):
                continue
            else:
                ni = int((dist - bin_def[0]) / dbin)
                count[ni] += 1
                varg[ni] += calc_something(values[ni])

    # compute the mean
    for n1 in range(int(bin_def[2])):
        varg[ni] /= count[ni]

这段代码适用于一些简单的并行化(valuesdist 非常大):需要将第一个 for 循环拆分为不同的进程,每个进程都使用自己的 count 版本和varg 数组。完成后,必须通过在第二个 for 循环之前将 countvarg 的不同版本相加来将所有内容组合在一起(短得多)。

也就是说,这两天我试图了解如何在 cython 中有效地实现这一点,我开始怀疑当前版本的语言不可能。请注意,仅将cython.parallel 中的prange 用于第一个循环并不能提供正确的结果,因为(我假设)来自不同线程的nicountvarg 同时访问。

cython 并行支持真的那么有限吗?我得到了如此好的单线程加速,我只是希望我能继续......

【问题讨论】:

    标签: multithreading multiprocessing cython python-multiprocessing


    【解决方案1】:

    我可以在这里想到三个选项:

    1. 使用 GIL 确保 += 是单线程完成的:

      varg_ni = calc_something(values[ni]) # keep this out 
                     # of the single threaded block...
      with gil:
          count[ni] += 1
          varg[ni] += varg_ni
      

      只要在calc_something 中完成的工作相当大,这很容易并且不会太糟糕

    2. 制作countvarg 二维数组,每个线程写入不同的列。之后沿第二维求和:

      # rough, untested outline....
      
      # might need to go in a `with parallel()` block
      num_threads = openmp.omp_get_num_threads()
      
      cdef double[:,:] count_tmp = np.zeros((count.shape[0],num_threads))
      cdef double[:,:] varg_tmp = np.zeros((varg.shape[0],num_threads))
      
      # then in the loop:
      count_tmp[ni,cython.parallel.threadid()] += 1
      varg_tmp[ni,cython.parallel.threadid()] += calc_something(values[ni])
      
      # after the loop:
      count[:] = np.sum(count_tmp,axis=1)
      varg[:] = np.sum(varg_tmp,axis=1)
      

      您也可以使用local_buf example in the documentation 中的想法做类似的事情。

    3. (注意 - GCC 目前给我一个“内部编译器错误” - 我觉得它应该可以工作,但目前它似乎不起作用,所以尝试选项 3风险自负...)使用openmp atomic directive 以原子方式进行添加。这需要一些工作来规避 Cython,但应该不会太难。使用add_inplace 宏创建一个简短的 C 头文件:

      #define add_inplace(x,y) _Pragma("omp atomic") x+=y
      

      _Pragma 是一个 C99 功能,应该允许您将 pragma 放入预处理器语句中。然后告诉 Cython 那个头文件(好像它是一个函数):

      cdef extern from "header.h":
          void add_inplace(...) nogil # just use varargs to make Cython think it accepts anything
      

      然后在循环中做:

      add_inplace(count[ni], 1)
      add_inplace(varg[ni], calc_something(values[ni]))
      

      因为这使用了宏技巧,所以它可能有点脆弱(即绝对不能与PyObject*s 一起使用,但它应该在使用标准 C 数字类型时生成正确的 C 代码。(检查代码以确保)

    【讨论】:

    • 谢谢,我试过with gil的方式,但我的calc_something太慢了,我什至没有提到它。另一方面,其他选项似乎很有希望。
    • 解决方案 2 对我来说是可行的方法,但仅对大问题(因为我真正感兴趣的问题)值得努力。但是请注意 prange 有一些奇怪的行为,因为在循环之后强制 return 。我没有尝试解决方案 3,因为它对我来说有点遥不可及。
    • 我无法让解决方案 3 正常工作,所以我认为您最好避免使用它 :)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-07-23
    • 1970-01-01
    • 2017-03-17
    相关资源
    最近更新 更多