【问题标题】:Parallel processing with Pool in Python在 Python 中使用 Pool 进行并行处理
【发布时间】:2019-05-21 19:54:06
【问题描述】:

我尝试在本地定义的函数上运行并行处理,如下所示:

import multiprocessing as mp                                                                                               
import numpy as np
import pdb


def testFunction():                                                                                                        
  x = np.asarray( range(1,10) )
  y = np.asarray( range(1,10) )

  def myFunc( i ):
    return np.sum(x[0:i]) * y[i]

  p = mp.Pool( mp.cpu_count() )
  out = p.map( myFunc, range(0,x.size) )
  print( out )


if __name__ == '__main__':
  print( 'I got here' )                                                                                                         
  testFunction()

当我这样做时,我收到以下错误:

cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

如何使用多处理来并行处理多个数组,就像我在这里尝试做的那样? x 和 y 必须在函数内部定义;我宁愿不让它们成为全局变量。

感谢所有帮助。

【问题讨论】:

  • 我认为您对多处理的工作方式存在误解 - map 调用的函数将在没有函数本地数据概念的单独进程中执行。您必须将要处理的数据传递给将要处理它的函数,或者在 map 参数中显式传递数据,或者通过例如传递数据。排队。
  • @barny 如何传递数据?请注意,我不需要更改 x 或 y;我只需要使用它们。
  • 您是否阅读过文档,例如docs.python.org/3/library/multiprocessing.html - 你可以试试共享内存,也许吧?否则,您必须将要操作的数据显式发送到每个进程。
  • @barny 我已阅读文档,但我正在努力解决。如何使用共享内存?如何将数据显式发送到每个进程?

标签: python parallel-processing python-multiprocessing


【解决方案1】:

只需将处理函数设为全局并传递数组值对,而不是通过函数中的索引来引用它们:

import multiprocessing as mp

import numpy as np


def process(inputs):
    x, y = inputs

    return x * y


def main():
    x = np.asarray(range(10))
    y = np.asarray(range(10))

    with mp.Pool(mp.cpu_count()) as pool:
        out = pool.map(process, zip(x, y))

    print(out)


if __name__ == '__main__':
    main()

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

更新:根据提供的新细节,您必须在不同进程之间共享数组。这正是multiprocessing.Manager 的用途。

Manager() 返回的管理器对象控制一个服务器进程,该进程 保存 Python 对象并允许其他进程操作它们 使用代理。

因此生成的代码将如下所示:

from functools import partial
import multiprocessing as mp

import numpy as np


def process(i, x, y):
    return np.sum(x[:i]) * y[i]


def main():
    manager = mp.Manager()

    x = manager.Array('i', range(10))
    y = manager.Array('i', range(10))

    func = partial(process, x=x, y=y)

    with mp.Pool(mp.cpu_count()) as pool:
        out = pool.map(func, range(len(x)))

    print(out)


if __name__ == '__main__':
    main()

输出:

[0, 0, 2, 9, 24, 50, 90, 147, 224, 324]

【讨论】:

  • 如果你有多个参数,我更喜欢pool.starmap
  • 感谢您的帮助!我稍微改变了这个问题,因为我需要做的不仅仅是迭代 x。如何调整解决方案来解决它?
  • @user24205 我已经更新了答案,现在不同的进程共享相同的数据数组。
  • @Rightleg 再次感谢您!最后一个问题,如果共享变量不是数组而是更复杂的东西怎么办?就我而言,我试图分享 scipy 的 Voronoi 函数的输出(这很复杂;我不确定它到底是什么)。
  • @user24205,如果你不知道输出是什么,那你打算怎么用呢? :) 无论如何,我建议在工作进程之间共享数据之前将数据转换为数组,在将数据提供给工作进程之前使用某种预处理。
猜你喜欢
  • 2022-08-24
  • 2020-05-16
  • 1970-01-01
  • 2019-01-07
  • 1970-01-01
  • 1970-01-01
  • 2014-01-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多