【问题标题】:Python multiprocessing - How to create a function that parallelizes a for loopPython 多处理 - 如何创建一个并行化 for 循环的函数
【发布时间】:2017-01-28 06:20:54
【问题描述】:

如果您打开 Jupyter Notebook 并运行以下命令:

import multiprocessing
def f(x):
    a = 3 * x
    pool = multiprocessing.Pool(processes=1)
    global g
    def g(j):
        return a * j
    return pool.map(g, range(5))
f(1)

你会得到以下错误

Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/Users/me/anaconda3/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/Users/me/anaconda3/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/me/anaconda3/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/Users/me/anaconda3/lib/python3.5/multiprocessing/queues.py", line 345, in get
    return ForkingPickler.loads(res)
AttributeError: Can't get attribute 'g' on <module '__main__'>

我正在尝试了解这是错误还是功能。

我正在尝试让它工作,因为在我的真实案例中 f 基本上是一个易于并行化的 for 循环(每次迭代只需更改一个参数)但每次迭代都需要大量时间!我是正确地解决问题还是有更简单的方法? (注意:在整个 notebook 中,f 本身会被调用多次,参数不同)

【问题讨论】:

    标签: python-3.x multiprocessing jupyter-notebook python-multiprocessing


    【解决方案1】:

    如果您想将g 应用于更多参数,而不仅仅是pool.map 传递的迭代器元素,您可以像这样使用functools.partial

    import multiprocessing
    import functools
    
    def g(a, j):
        return a * j
    
    def f(x):
        a = 3 * x
        pool = multiprocessing.Pool(processes=1)
        g_with_a = functools.partial(g, a)
        return pool.map(g_with_a, range(5))
    
    f(1)
    

    functools.partial 所做的是获取一个函数和任意数量的参数(通过位置和关键字)并返回一个新函数,该函数的行为类似于您传递给它的函数,但只接受您没有的参数'传给partial

    partial 返回的函数可以毫无问题地被腌制 i。 e.传递给pool.map,只要你使用的是python3。

    这与 Darth Kotik 在他的回答中描述的基本相同,但您不必自己实现 Calculator 类,因为 partial 已经做了您想要的。

    【讨论】:

      【解决方案2】:

      如果您在f 之外定义g,它就可以正常工作。

      import multiprocessing
      
      def g(j):
          return 4 * j
      
      def f():
          pool = multiprocessing.Pool(processes=1)
          return pool.map(g, range(5))
      
      f()
      

      编辑: 例如,您输入问题的可调用对象看起来有点像这样:

      class Calculator():
          def __init__(self, j):
              self.j = j
      
          def __call__(self, x):
              return self.j*x
      

      你的函数f 变成这样:

      def f(j):
          calculator = Calculator(j) 
          pool = multiprocessing.Pool(processes=1)
          return pool.map(calculator, range(5))
      

      在这种情况下,它工作得很好。希望它有所帮助。

      【讨论】:

      • 但问题是g 使用在f 的主体中计算的变量值,因此它不能在外部定义!请参阅我编辑的反映这种情况的问题
      • 在这种情况下:我相信这个会有所帮助stackoverflow.com/questions/4827432/…
      • 我不明白!我什至不明白这是否意味着这是一个错误或一个功能?
      • multiprocessing.pool 使用 pickle 传递参数。模块级函数是可挑选的,但 lambdas 和本地函数不是,所以你不能传递 lambda 函数。如果您创建自定义对象,它是可腌制的,因此您可以传递它。这更像是一个错误而不是一个功能,但它是设计使然,所以你必须接受它。
      • 我明白.. 但我该怎么做呢?我对如何创建一个可以解决这个问题的函数对象一无所知
      猜你喜欢
      • 1970-01-01
      • 2018-08-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-12-10
      • 2022-01-03
      • 2021-09-17
      • 2020-12-09
      相关资源
      最近更新 更多