【问题标题】:Passing multiple parameters to pool.map() function in Python [duplicate]在Python中将多个参数传递给pool.map()函数[重复]
【发布时间】:2014-10-22 14:49:55
【问题描述】:

我需要一些方法来使用 pool.map() 中接受多个参数的函数。根据我的理解, pool.map() 的目标函数只能有一个可迭代的参数,但有没有办法可以传递其他参数?在这种情况下,我需要传入一些配置变量,例如我的 Lock() 和日志信息到目标函数。

我试图做一些研究,我认为我可以使用部分函数来让它工作?但是我不完全理解这些是如何工作的。任何帮助将不胜感激!这是我想做的一个简单示例:

def target(items, lock):
    for item in items:
        # Do cool stuff
        if (... some condition here ...):
            lock.acquire()
            # Write to stdout or logfile, etc.
            lock.release()

def main():
    iterable = [1, 2, 3, 4, 5]
    pool = multiprocessing.Pool()
    pool.map(target(PASS PARAMS HERE), iterable)
    pool.close()
    pool.join()

【问题讨论】:

  • 在这里讨论过:stackoverflow.com/questions/5442910/…(我已经成功使用了 J.F. Sebastien 的“star”方法)
  • 请在使用多处理时使用 try/finally 子句,并在 finally 中使用 close() 和 join() 以确保在发生错误时关闭进程。 stackoverflow.com/questions/30506489/…
  • @zeehio 不应该是自动的吗?
  • @endolith 应该,但通常不是。如果主 python 程序结束,它的所有子程序都被杀死/回收,但如果主程序继续运行(例如,因为并行化组件是整个程序的一小部分),你将需要一些东西(例如 try/finally)来确保所有进程被终止。
  • 不要使用 close(),在上下文中使用池:使用 multiProc.Pool(3) 作为 my_pool:

标签: python multiprocessing pool map-function


【解决方案1】:

您可以为此使用functools.partial(正如您所怀疑的那样):

from functools import partial

def target(lock, iterable_item):
    for item in iterable_item:
        # Do cool stuff
        if (... some condition here ...):
            lock.acquire()
            # Write to stdout or logfile, etc.
            lock.release()

def main():
    iterable = [1, 2, 3, 4, 5]
    pool = multiprocessing.Pool()
    l = multiprocessing.Lock()
    func = partial(target, l)
    pool.map(func, iterable)
    pool.close()
    pool.join()

例子:

def f(a, b, c):
    print("{} {} {}".format(a, b, c))

def main():
    iterable = [1, 2, 3, 4, 5]
    pool = multiprocessing.Pool()
    a = "hi"
    b = "there"
    func = partial(f, a, b)
    pool.map(func, iterable)
    pool.close()
    pool.join()

if __name__ == "__main__":
    main()

输出:

hi there 1
hi there 2
hi there 3
hi there 4
hi there 5

【讨论】:

  • 太棒了,我想我所需要的只是一个像这样的清晰示例。非常感谢!
  • 很好的例子。不过有一个问题:为什么target 的定义中有for item in items:
  • @Jean-FrancoisT。复制/粘贴错误!感谢您指出。
  • 锁有问题的,请查看stackoverflow.com/questions/25557686/…
  • 如果变量在第一位怎么办?比如test(input, p1, p2, p3=None),我有p1, p2, p3固定,input变化?
【解决方案2】:

您可以使用允许多个参数的 map 函数,就像在 pathos 中找到的 multiprocessing 的分支一样。

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> def add_and_subtract(x,y):
...   return x+y, x-y
... 
>>> res = Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1))
>>> res
[(-5, 5), (-2, 6), (1, 7), (4, 8), (7, 9), (10, 10), (13, 11), (16, 12), (19, 13), (22, 14)]
>>> Pool().map(add_and_subtract, *zip(*res))
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]

pathos 使您能够轻松嵌套具有多个输入的分层并行映射,因此我们可以扩展我们的示例来演示这一点。

>>> from pathos.multiprocessing import ThreadingPool as TPool
>>> 
>>> res = TPool().amap(add_and_subtract, *zip(*Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1))))
>>> res.get()
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]

更有趣的是,构建一个可以传递到池中的嵌套函数。 这是可能的,因为pathos 使用了dill,它可以序列化python 中的几乎任何东西。

>>> def build_fun_things(f, g):
...   def do_fun_things(x, y):
...     return f(x,y), g(x,y)
...   return do_fun_things
... 
>>> def add(x,y):
...   return x+y
... 
>>> def sub(x,y):
...   return x-y
... 
>>> neato = build_fun_things(add, sub)
>>> 
>>> res = TPool().imap(neato, *zip(*Pool().map(neato, range(0,20,2), range(-5,5,1))))
>>> list(res)
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]

但是,如果您无法脱离标准库,则必须以另一种方式进行。在这种情况下,您最好的选择是使用multiprocessing.starmap,如下所示:Python multiprocessing pool.map for multiple arguments(@Roberto 在 OP 帖子的 cmets 中指出)

在此处获取pathoshttps://github.com/uqfoundation

【讨论】:

    【解决方案3】:

    如果您无权访问functools.partial,您也可以为此使用包装函数。

    def target(lock):
        def wrapped_func(items):
            for item in items:
                # Do cool stuff
                if (... some condition here ...):
                    lock.acquire()
                    # Write to stdout or logfile, etc.
                    lock.release()
        return wrapped_func
    
    def main():
        iterable = [1, 2, 3, 4, 5]
        pool = multiprocessing.Pool()
        lck = multiprocessing.Lock()
        pool.map(target(lck), iterable)
        pool.close()
        pool.join()
    

    这使得target() 成为一个接受锁(或您想要提供的任何参数)的函数,并且它将返回一个只接受可迭代作为输入的函数,但仍然可以使用您的所有其他参数。这就是最终传递给pool.map() 的内容,然后应该可以毫无问题地执行。

    【讨论】:

    • 我在这方面已经很晚了,但是这段代码不起作用,因为嵌套函数不能被腌制。调用target(lck) 返回嵌套的wrapped_func 函数,需要腌制才能传递给工作进程,并且总是会失败。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-07-05
    • 1970-01-01
    • 2018-06-29
    • 2015-06-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多