【问题标题】:multiprocessing.Queue as arg to pool worker aborts execution of workermultiprocessing.Queue 作为 arg 池工人中止工人的执行
【发布时间】:2017-12-24 00:14:48
【问题描述】:

我实际上很难相信我遇到了我遇到的问题,这似乎是 python 多处理模块中的一个大错误......无论如何我遇到的问题是每当我将 multiprocessing.Queue 作为参数传递给 multiprocessing.Pool 工作者时,池工作者永远不会执行其代码。即使是在 python docs 中找到的示例代码的略微修改版本的非常简单的测试中,我也能够重现此错误。

这是队列示例代码的原始版本:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])


if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())  # prints "[42, None, 'hello']"
    p.join()

这是我对队列示例代码的修改版本:

from multiprocessing import Queue, Pool

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Pool(1)
    p.apply_async(f,args=(q,))
    print(q.get()) # prints "[42, None, 'hello']"
    p.close()
    p.join()

我所做的只是让 p 成为大小为 1 的进程池,而不是 multiprocessing.Process 对象,结果是代码永远挂在 print 语句上,因为没有任何东西写入队列!当然,我以原始形式对此进行了测试,并且效果很好。我的操作系统是 windows 10,我的 python 版本是 3.5.x,有人知道为什么会这样吗?

更新:仍然不知道为什么此示例代码适用于 multiprocessing.Process 而不是 multiprocessing.Pool 但我找到了 work around 我很满意(Alex Martelli 的回答)。显然,您可以只创建一个 multiprocessing.Queues 的全局列表并传递每个进程和索引以供使用,我将避免使用托管队列,因为它们速度较慢。感谢客人向我展示链接。

【问题讨论】:

  • 您可能想看看#1#2#3#4#5。原因似乎是 Queue 实例不能被腌制。但是我不明白为什么这会使进程的底层队列陷入僵局。使用池大小为 2 的Ctrl+C 表明它被困在task = inqueue.get() 处,它将请求目标函数。这有点令人费解。
  • 请注意,使用异步编程您不需要手动处理结果队列 - apply_async 返回一个 AsyncResult 实例,可用于获取结果:result.get()。这使用了一个底层的结果(out-)队列,所以你只需要在你的目标函数中return。此外,如果您使用result.get() 并将Queue 实例作为参数传递给目标函数,它将引发RuntimeError。但是我很好奇为什么您的示例没有发生这种情况。
  • 查看我对您的回答的评论。我的目标不是“结果队列”,这只是一个简单的例子。我需要一个不断写入和处理的队列。

标签: python python-3.x python-multiprocessing


【解决方案1】:

问题

当您调用apply_async 时,它会返回一个AsyncResult 对象并将工作负载分配给单独的线程(另请参见this answer)。该线程遇到Queue对象不能为pickled的问题,因此请求的工作无法分发(并最终执行)。我们可以通过调用AsyncResult.get看到这一点:

r = p.apply_async(f,args=(q,))
r.get()

引发RuntimeError:

RuntimeError: Queue objects should only be shared between processes through inheritance

但是,这个RuntimeError 仅在您请求结果后才会在主线程中引发,因为它实际上发生在不同的线程中(因此需要一种传输方式)。

那么当你这样做时会发生什么

p.apply_async(f,args=(q,))

是目标函数f 永远不会被调用,因为它的一个参数(q)不能被腌制。因此q 永远不会收到一个项目并保持为空,因此在主线程中对q.get 的调用将永远阻塞。

解决方案

使用apply_async,您不必手动管理结果队列,但它们很容易以AsyncResult 对象的形式提供给您。所以你可以修改代码以简单地从目标函数返回:

from multiprocessing import Queue, Pool

def f():
    return [42, None, 'hello']

if __name__ == '__main__':
    q = Queue()
    p = Pool(1)
    result = p.apply_async(f)
    print(result.get())

【讨论】:

  • 很有趣,但是当您仅使用 multiprocessing.Process 而不是 multiprocessing.Pool 时,我看不到代码是如何工作的,它们都创建了新进程,因此不需要为两者腌制队列方法?此外,使用 AsyncResult 解决方法对我来说并不可行,因为我需要一堆工作进程不断写入队列,然后由另一个工作进程读取和处理。
  • @profPlum 对于这些问题,我想参考this answer。本质是Pool 立即启动进程,而Process 在收到Queue 实例后启动(这里不需要酸洗,进程尚未运行)。您可以使用Managers 在进程之间共享对象或使用Poolinitializerinitargs 关键字参数。
  • 啊,好吧,我不太确定为什么进程已经在运行这一事实需要酸洗,但有了关于初始化程序和 initargs 的解释和信息,我会接受你的回答
  • @profPlum 这是由于进程之间共享信息的方式。在类 Unix 系统上,他们为此目的使用套接字,并且为了通过套接字发送(Python)对象,它会被腌制。 Python 中的Process 实例在启动时会产生一个系统进程,因此在此之前,您可以像往常一样与该实例共享 Python 对象。但是为了与正在运行的进程共享,他们必须使用操作系统的方式进行进程间通信。
  • 我明白了,谢谢你更详细的解释现在更有意义了
猜你喜欢
  • 2012-04-14
  • 2012-04-10
  • 1970-01-01
  • 2023-03-25
  • 1970-01-01
  • 2012-06-19
  • 1970-01-01
  • 1970-01-01
  • 2011-07-02
相关资源
最近更新 更多