【问题标题】:How to use gsl integration in a cython prange block?如何在 cython prange 块中使用 gsl 集成?
【发布时间】:2016-07-20 02:12:54
【问题描述】:

为了使事情更笼统,以gsl_integration_qag 为例(因为它需要一个工作区来管理间隔)。

这是一个特意设计的简单示例(problem.pyx):

from libc.math cimport cos
from cython.parallel import prange
from cython_gsl cimport *

cdef double mean(double[:] arr):
    cdef Py_ssize_t i
    cdef double sum_ = 0.0
    for i in range(arr.shape[0]):
        sum_ += arr[i]
    return sum_ / arr.shape[0]

cdef double integrand(double x, void *params) nogil:
    cdef double a = (<double*>params)[0]
    cdef double b = (<double*>params)[1]
    return a*x + b

def use_gsl_integration_sequential():
    cdef double result, error
    cdef Py_ssize_t i, j
    cdef double result_sum=0.0
    cdef double results[3]
    cdef double a=0.0, b=0.0
    cdef double da = 0.1
    cdef size_t space_size = 1000

    cdef gsl_function gsl_func
    cdef double params[2]
    gsl_func.function = &integrand

    cdef gsl_integration_workspace *ws
    ws= gsl_integration_workspace_alloc(space_size)

    for i in range(3):
        a += da
        result_sum = 0.0 # reset the sum(thank J.J.Hakala for pointing out this)
        for j in range(4): # here may be range(100000) in practice!
            b = a + j
            params[0] = a; params[1] = b
            gsl_func.params = params
            gsl_integration_qag(&gsl_func, 0.0, 1.0, 1e-8, 1e-8, space_size, GSL_INTEG_GAUSS15, ws, &result, &error)
            result_sum += result
        results[i] = result_sum

    gsl_integration_workspace_free(ws)
    # do some other stuff with the 'results', for example:
    return mean(results)


# Now, I want to parallel the inner loop
def use_gsl_integration_parallel():
    cdef double result, error
    cdef Py_ssize_t i, j
    cdef double result_sum=0.0
    cdef double results[3]
    cdef double a, b
    cdef double da = 0.1
    cdef size_t space_size = 1000

    cdef gsl_function gsl_func
    cdef double params[2]
    gsl_func.function = &integrand

    cdef gsl_integration_workspace *ws
    ws= gsl_integration_workspace_alloc(space_size)

    for i in range(3):
        # a should be read-only in the follow prange block.
        a += da 
        # here may be prange(100000,...) in practice!
        for j in prange(4, nogil=True): 
            # b should be local private.
            b = a + j

            # params also should be local private
            params[0] = a; params[1] = b

            # gsl_func is shared, only its params differ.
            # According to the gsl manual(see follow link), workspaces should be allocated on a per-thread basis, but how?
            gsl_func.params = params
            gsl_integration_qag(&gsl_func, 0.0, 1.0, 1e-8, 1e-8, space_size, GSL_INTEG_GAUSS15, ws, &result, &error)

            result_sum += result

        results[i] = result_sum
        gsl_integration_workspace_free(ws)
    return mean(results)

代码有点长,只是为了完整(复制),但我觉得比较简单(阅读)(๑•ᴗ•๑)

然后编译链接:

cython -a -2 problem.pyx

gcc -O3 -fopenmp -c problem.c -o problem.o -IC:\gsl2.1x86\include -IC:\python-2.7.10\include

gcc -shared -fPIC -fopenmp -o problem.pyd -LC:\gsl2.1x86\libs -LC:\python-2.7.10\libs problem.o -l:libgsl.a -l:libgslcblas.dll.a -l:libpython27.dll.a

在 IPython 中:

In [1]: from problem import *
In [2]: use_gsl_integration_sequential()
Out[2]: 7.2

use_gsl_integration_parallel()的结果是未定义的,我试了好几次,最多的时候会崩溃,运气好,得到了未定义的值,所以肯定有问题!我只是找不到这样一个平行的例子。有人能帮帮我吗?

环境:

win10-64bit、gcc (tdm-1) 5.1.0 32bit、python-2.7.10 32bit、cython0.24、gsl-2.1

一些有用的参考资料?:

https://github.com/twiecki/CythonGSL

https://www.gnu.org/software/gsl/manual/html_node/Thread_002dsafety.html#Thread_002dsafety

(对不起,无法发布更多信息,两个链接是我的限制......

【问题讨论】:

  • 我认为gsl_func 也需要是本地的。我也不是 100% 确定它会在这里将 result 视为本地(但我不知道它不是!)
  • 确实,让我很困惑的是在哪里“分配”这些东西(gsl_function、params、workspace)。乍一看,这似乎很容易——容易并行,但我错了。此外,omp's cython support 相当弱且隐含(违反“显式优于隐式”)。我开始怀疑这个问题在 cython 中是否可行。事实上,任何 C 代码示例都会有所帮助(尽管我不太熟悉在 C 中使用 omp。)
  • 我认为你使用了一个并行块,在那里分配它。然后在并行块内使用一个 prange。不过,我现在实际上无法测试它,所以我不能保证它是正确的。
  • @DavidW 出现了更奇怪的现象:在我原来的函数中,我添加了一个int num_threads参数,并将其传递给prange的num_threads kwarg,翻译成C时,cython会报错:optimization.c: In function '__pyx_f_12optimization_target_func0': error: '__pyx_v_num_threads' undeclared (first use in this function) #pragma omp parallel reduction(+:__pyx_v_integrate_sum) num_threads(__pyx_v_num_threads) .但我无法在上面的简单示例中重现它。这是由于您提到的嵌套块吗?我完全糊涂了,我会做更多的研究。
  • ps:如果我在函数级别添加一个局部变量,比如num_therads = num_therads,那么 cython 不会抱怨...

标签: cython gsl


【解决方案1】:

当我测试该代码时,我遇到了分段错误(linux,gcc 4.9.2)。

我认为原代码存在以下问题

  • 四个线程共享多个变量,每个线程都应该有自己的变量
  • 这些线程中使用的工作空间相同

以下版本给出 7.2。顺序版本实际上是否正确,因为它没有设置result_sum = 0.0 in

for i in range(3):
    a += da
    result_sum = 0.0
from openmp cimport omp_get_max_threads, omp_get_thread_num

def use_gsl_integration_parallel():
    cdef double result
    cdef Py_ssize_t i, j
    cdef double result_sum
    cdef double results[3]
    cdef double results2[4]
    cdef double a
    cdef double b[4]
    cdef double error[4]
    cdef double da = 0.1
    cdef size_t space_size = 1000

    cdef gsl_function gsl_func[4]
    cdef double params[4][2]
    cdef int tid

    cdef gsl_integration_workspace ** ws = <gsl_integration_workspace **>malloc(omp_get_max_threads() * sizeof(gsl_integration_workspace *))

    for j in range(omp_get_max_threads()):
        ws[j] = gsl_integration_workspace_alloc(space_size)

    for i in range(3):
        a += da

        for j in prange(4, nogil=True):
            tid = omp_get_thread_num()
            b[tid] = a + j

            params[tid][0] = a;
            params[tid][1] = b[tid]

            gsl_func[tid].function = &integrand
            gsl_func[tid].params = params[tid]
            gsl_integration_qag(&gsl_func[tid], 0.0, 1.0, 1e-8, 1e-8,
                                space_size, GSL_INTEG_GAUSS15, ws[tid],
                                &results2[j], &error[j])
            # printf("tid %d\n", tid)
        results[i] = sum(results2)

    for j in range(omp_get_max_threads()):
        gsl_integration_workspace_free(ws[j])
    free(ws)

    return mean(results)

【讨论】:

  • 酷!我被 cython prange 示例误导了,我认为它们(params,gsl_func,...)将自动设为本地私有,然后由每个线程“复制”(原谅​​这种愚蠢的想法)。我着手改造我的代码,现在我面临一个新问题:这种集成将被大量频繁地调用,因此无法在本地 malloc 和释放这些巨大的数组(结果、参数),我的想法是分配它们在其他地方一次,然后只是在集成中检查这些缓冲区,最后将它们适当地释放到某个地方。有更好的建议吗?@J.J.哈卡拉
  • 即上例中,将循环改为:for i in range(1000): ... for j in range(1000000), ...,如何正确处理这些数组(╹◡╹)
  • @oz1 我将gsl_integration_workspace_alloc 移出use_gsl_integration_parallel() 的循环,因为它们是可重用的,并且反复分配/释放它们不是一个好主意。数组的其他空间分配是堆栈分配,因此它们并不昂贵。该工作区分配可能只针对最大数量的线程openmp.omp_get_max_threads(),然后在循环内ws[openmp.omp_get_thread_num()] 而不是`ws[j]'。
  • 非常感谢,现在我明白了workspaces should be allocated on a per-thread basis,工作区是每个线程共享的!但最重要的是实践中的“其他空间分配”,results2error... 有时可能有数万长度,因此它们不能留在堆栈上,而use_gsl_integration_parallel 将在一个巨大的迭代循环中被调用——每个循环都可能被调用几十次......哦,这可能有点跑题了,那我应该重构我的代码。再次感谢!
【解决方案2】:

感谢@J.J.Hakala,我终于弄清楚了如何让这些东西发挥作用。我改造了这个例子,并做了一些简单的配置文件。我只是在这里发布它,希望有一天它对科学计算的一些新手(和我一样)有所帮助。

如有错误,请指正。谢谢!

# cython: cdivision=True
# cython: boundscheck=False
# cython: wraparound=False

from libc.stdlib cimport malloc, free
from libc.math cimport sin
from cython.parallel import prange
from cython_gsl cimport *
cimport openmp

cdef double arr_sum(double *arr, size_t arr_len):
    cdef size_t i
    cdef double sum_ = 0.0
    for i in range(arr_len):
        sum_ += arr[i]
    return sum_

cdef double arr_mean(double *arr, size_t arr_len):
    return arr_sum(arr, arr_len) / arr_len

# A random chosen integrand function for demo.
cdef double integrand(double x, void *params) nogil:
    cdef double a = (<double*>params)[0]
    cdef double b = (<double*>params)[1]
    return sin(a*x + b)

def sequential_solution(int outer_loops, int inner_loops):
    cdef double result, error
    cdef size_t i, j
    cdef double result_sum=0.0, mean_val
    cdef double *results = <double*>malloc(sizeof(double) * outer_loops)
    cdef double a=0.0, da = 0.1
    cdef size_t space_size = 1000
    cdef gsl_function gsl_func
    cdef double params[2]

    gsl_func.function = &integrand
    cdef gsl_integration_workspace *ws = gsl_integration_workspace_alloc(space_size)

    for i in range(outer_loops):
        a += da
        result_sum = 0.0
        for j in range(inner_loops):
            params[0] = a; params[1] = a + j
            gsl_func.params = params
            gsl_integration_qag(&gsl_func, 0.0, 1.0, 1e-8, 1e-8, space_size, GSL_INTEG_GAUSS15, ws, &result, &error)
            result_sum += result

        results[i] = result_sum

    mean_val = arr_mean(results, outer_loops)
    gsl_integration_workspace_free(ws)
    free(results)
    return mean_val


def parallel_solution(int outer_loops, int inner_loops, int num_threads):
    cdef size_t i, j
    cdef double a = 0.0
    cdef double da = 0.1
    cdef double mean_val = 0.0
    cdef size_t space_size = 1000

    # Heavy malloc!
    cdef double *outer_results = <double*>malloc(sizeof(double) * outer_loops)
    cdef double *inner_results = <double*>malloc(sizeof(double) * inner_loops)
    #cdef double *error = <double*>malloc(sizeof(double) * inner_loops)
    cdef double error # Ignore the integration abserror in parallel
    cdef gsl_function *gsl_func = <gsl_function*>malloc(sizeof(gsl_function) * inner_loops)
    cdef double *params = <double*>malloc(sizeof(double) * inner_loops * 2)
    cdef gsl_integration_workspace **ws = <gsl_integration_workspace**>malloc(sizeof(gsl_integration_workspace*) * num_threads)
    for i in range(num_threads):
        ws[i] = gsl_integration_workspace_alloc(space_size)

    for i in range(outer_loops):
        a += da
        for j in prange(inner_loops, nogil=True, num_threads=num_threads):
            error = 0 # local private.
            params[2*j] = a; params[2*j+1] = a+j
            gsl_func[j].function = &integrand
            gsl_func[j].params = params + 2*j

            # ws[openmp.omp_get_thread_num()] guarantees each thread shares it's own workspace,
            # since workspace is resuable in a thread. If two thread access the same workspace, data will corrupt.
            gsl_integration_qag(&gsl_func[j], 0.0, 1.0, 1e-8, 1e-8,
                                space_size, GSL_INTEG_GAUSS15, ws[openmp.omp_get_thread_num()],
                                &inner_results[j], &error)

        outer_results[i] = arr_sum(inner_results, inner_loops)

    mean_val = arr_mean(outer_results, outer_loops)

    # Now, free
    for i in range(num_threads):
        gsl_integration_workspace_free(ws[i])
    free(ws)
    free(params)
    free(gsl_func)
    #free(error)
    free(inner_results)
    free(outer_results)
    return mean_val

def parallel_solution_ver(int outer_loops, int inner_loops, int num_threads):
    cdef size_t i, j
    cdef double a = 0.0
    cdef double da = 0.1
    cdef double mean_val = 0.0
    cdef size_t space_size = 1000

    cdef double *outer_results = <double*>malloc(sizeof(double) * outer_loops)
    #cdef double *inner_results = <double*>malloc(sizeof(double) * inner_loops) 
    cdef double inner_result, inner_result_sum # no need to malloc a inner_results 
    #cdef double *error = <double*>malloc(sizeof(double) * inner_loops)
    cdef double error  # Ignore the integration abserror in parallel
    cdef gsl_function *gsl_func = <gsl_function*>malloc(sizeof(gsl_function) * inner_loops)
    cdef double *params = <double*>malloc(sizeof(double) * inner_loops * 2)
    cdef gsl_integration_workspace **ws = <gsl_integration_workspace**>malloc(sizeof(gsl_integration_workspace*) * num_threads)
    for i in range(num_threads):
        ws[i] = gsl_integration_workspace_alloc(space_size)

    for i in range(outer_loops):
        a += da
        inner_result_sum = 0.0  # reduction(+: inner_result_sum)
        for j in prange(inner_loops, nogil=True, num_threads=num_threads):
            inner_result = 0.0  # local private.
            error = 0  # local private.
            params[2*j] = a; params[2*j+1] = a+j
            gsl_func[j].function = &integrand
            gsl_func[j].params = params + 2*j

            # ws[openmp.omp_get_thread_num()] guarantees each thread shares it's own workspace,
            # since workspace is resuable in a thread. If two thread access the same workspace, data will corrupt.
            gsl_integration_qag(&gsl_func[j], 0.0, 1.0, 1e-8, 1e-8,
                                space_size, GSL_INTEG_GAUSS15, ws[openmp.omp_get_thread_num()],
                                &inner_result, &error)
            inner_result_sum += inner_result

        outer_results[i] = inner_result_sum

    mean_val = arr_mean(outer_results, outer_loops)

    # Now, free
    for i in range(num_threads):
        gsl_integration_workspace_free(ws[i])
    free(ws)
    free(params)
    free(gsl_func)
    #free(error)
    #free(inner_results)
    free(outer_results)
    return mean_val

更新:不需要 malloc 一个 results 数组,添加了一个我认为更好的更新解决方案 parallel_solution_ver,尽管它与 parallel_solution 的效率几乎相同

在 IPython 笔记本中:

%matplotlib inline
from problem import *
import matplotlib.pyplot as plt

max_exp = 20

times_seq = []
for loops in [2**n for n in range(max_exp)]:
    res = %timeit -n 1 -r 5 -o sequential_solution(10, loops)
    times_seq.append((loops, res))

times_p = []
for loops in [2**n for n in range(max_exp)]:
    res = %timeit -n 1 -r 5 -o parallel_solution(10, loops, 8)
    times_p.append((loops, res))

times_p_ver = []
for loops in [2**n for n in range(max_exp)]:
    res = %timeit -n 1 -r 5 -o parallel_solution_ver(10, loops, 8)
    times_p_ver.append((loops, res))

time_results = [times_seq, times_p, times_p_ver]
labels = ["sequential solution", "parallel solution", "parallel solution_ver"]
colors = ['r', 'y', 'g']
markers = ['s', '+', 'o']
line_params = [
    {"sequential solution": {"color": "r", "marker": "s", "ms": 6}},
    {"parallel solution": {"color": "g", "marker": "+", "ms": 20, "mew": 1}},
    {"parallel solution_ver": {"color": "b", "marker": "o", "ms": 6}}
]

for results, param in zip(time_results, line_params):
    loops = [res[0] for res in results]
    times = [res[1].best for res in results]
    plt.loglog(loops, times, label=param.keys()[0], **param.values()[0])
plt.xlabel("inner loops")
plt.ylabel("exec time (ms)")
plt.legend(loc=0)

print "loops        sequential  parallel    parallel_ver"
for s, p, p_v in zip(times_seq, times_p, times_p):
    print "n = {:6d}   {:f}s   {:f}s   {:f}s".format(s[0], s[1].best, p[1].best, p_v[1].best)

# Same result? Yes!
print sequential_solution(10, 2**19)  # 0.0616505777102
print parallel_solution(10, 2**19, 8)  # 0.0616505777102
print parallel_solution_ver(10, 2**19, 8)  # 0.0616505777102

loops        sequential  parallel    parallel_ver
n =      1   0.000009s   0.001450s   0.001450s  
n =      2   0.000016s   0.001435s   0.001435s  
n =      4   0.000032s   0.001447s   0.001447s  
n =      8   0.000063s   0.001454s   0.001454s  
n =     16   0.000125s   0.001396s   0.001396s  
n =     32   0.000251s   0.001389s   0.001389s  
n =     64   0.000505s   0.001387s   0.001387s  
n =    128   0.001016s   0.001360s   0.001360s  
n =    256   0.002039s   0.001683s   0.001683s  
n =    512   0.003846s   0.001844s   0.001844s  
n =   1024   0.007717s   0.002416s   0.002416s  
n =   2048   0.015395s   0.003575s   0.003575s  
n =   4096   0.030817s   0.006055s   0.006055s  
n =   8192   0.061689s   0.010820s   0.010820s  
n =  16384   0.123445s   0.020872s   0.020872s  
n =  32768   0.246928s   0.040165s   0.040165s  
n =  65536   0.493903s   0.079077s   0.079077s  
n = 131072   0.987832s   0.156176s   0.156176s  
n = 262144   1.975107s   0.311151s   0.311151s  
n = 524288   3.949421s   0.617181s   0.617181s     

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-09-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-03
    • 1970-01-01
    相关资源
    最近更新 更多