【问题标题】:How to use Values in a multiprocessing pool with Python如何使用 Python 在多处理池中使用值
【发布时间】:2015-08-15 21:42:20
【问题描述】:

我希望能够使用多处理库中的值模块来跟踪数据。据我所知,当涉及到 Python 中的多处理时,每个进程都有自己的副本,所以我无法编辑全局变量。我希望能够使用 Values 来解决这个问题。有谁知道我如何将 Values 数据传递到池化函数中?

from multiprocessing import Pool, Value
import itertools

arr = [2,6,8,7,4,2,5,6,2,4,7,8,5,2,7,4,2,5,6,2,4,7,8,5,2,9,3,2,0,1,5,7,2,8,9,3,2,]

def hello(g, data):
    data.value += 1

if __name__ == '__main__':
    data = Value('i', 0)
    func = partial(hello, data)
    p = Pool(processes=1)
    p.map(hello,itertools.izip(arr,itertools.repeat(data)))

    print data.value

这是我得到的运行时错误:

RuntimeError: Synchronized objects should only be shared between processes through inheritance

有谁知道我做错了什么?

【问题讨论】:

  • 我认为您需要将 data 变量传递给所有进程。
  • @TomDalton 我刚刚使用 itertools 更新了代码以将数据变量传递给 hello 函数,我现在收到一个错误,我不确定它为什么会发生。
  • 你为什么不从hello()返回数据呢?这就是map 的全部意义所在。

标签: python python-multiprocessing


【解决方案1】:

我不知道为什么,但是使用Pool 似乎存在一些问题,如果手动创建子进程则不会出现这些问题。例如。以下作品:

from multiprocessing import Process, Value

arr = [1,2,3,4,5,6,7,8,9]


def hello(data, g):
    with data.get_lock():
        data.value += 1
    print id(data), g, data.value

if __name__ == '__main__':
    data = Value('i')
    print id(data)

    processes =  []
    for n in arr:
        p = Process(target=hello, args=(data, n))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print "sub process tasks completed"
    print data.value

但是,如果您使用Pool 进行基本相同的思考,则会收到错误“RuntimeError:同步对象只能通过继承在进程之间共享”。我以前在使用池时看到过该错误,但从未完全了解它。

似乎可以与Pool 一起使用的Value 的替代方法是使用管理器为您提供“共享”列表:

from multiprocessing import Pool, Manager
from functools import partial


arr = [1,2,3,4,5,6,7,8,9]


def hello(data, g):
    data[0] += 1


if __name__ == '__main__':
    m = Manager()
    data = m.list([0])
    hello_data = partial(hello, data)
    p = Pool(processes=5)
    p.map(hello_data, arr)

    print data[0]

【讨论】:

  • 完美!这正是我想要的!看来我最终不得不改用 Manager 了!非常感谢汤姆!
  • Ref 共享值,这个答案 (stackoverflow.com/a/9931389/2372812) 似乎对池初始化程序做了一些解决方法以使其工作,但这似乎是一个相当糟糕的解决方案。请注意,与“真实”共享内存相比,使用管理器可能会导致 IPC 变慢。
【解决方案2】:

几乎不需要将ValuesPool.map() 一起使用。

map 的中心思想是将函数应用于列表或其他迭代器中的每个项目,将返回值收集到列表中。

Pool.map 背后的想法基本相同,但随后分散到多个进程中。在每个工作进程中,映射函数都会被迭代器中的项目调用。 来自工作进程中调用的函数的返回值被传输回父进程并收集在最终返回的列表中。


或者,您可以使用Pool.imap_unordered,它会在结果可用时立即开始返回结果,而不是等到一切都完成。因此,您可以统计返回结果的数量并使用它来更新进度条。

【讨论】:

  • 如果我想拥有例如进度条怎么办?我需要一个所有工作进程一起递增的计数器。
  • @BarafuAlbino 您可以为此使用multiprocessing.Value。请注意(来自链接的文档),您必须在增加值之前获取锁!
  • 当然。正如问题所述,除了 Value 不适用于 multiprocessing.Pool.map。
  • @BarafuAlbino 您可能应该创建Value 之前 if __name__ == "__main__",以便子进程继承它。它可能取决于操作系统。 MS-windows 有点奇怪,因为它没有fork,这使得multiprocessing 在类UNIX 系统上的实现更容易。
  • @BarafuAlbino 但imap_unordered 可能更简单。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-06-09
  • 2018-06-11
  • 2023-02-22
  • 2019-07-26
  • 1970-01-01
  • 2020-03-03
  • 1970-01-01
相关资源
最近更新 更多