【问题标题】:Strange Queue.PriorityQueue behaviour with multiprocessing in Python 2.7.6Python 2.7.6 中多处理的奇怪 Queue.PriorityQueue 行为
【发布时间】:2014-10-09 01:54:24
【问题描述】:

正如您从标题中知道的那样,我正在尝试将 PriorityQueue 与多处理一起使用。更准确地说,我想制作共享的 PriorityQueue,写了一些代码,但它没有按我的预期运行。

看代码:

import time
from multiprocessing import Process, Lock
from Queue import PriorityQueue


def worker(queue):
    lock = Lock()
    with lock:
        for i in range(100):
            queue.put(i)

    print "worker", queue.qsize()


pr_queue = PriorityQueue()
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()

time.sleep(5)    # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()

得到以下输出:

worker 100
main 0

发生了什么以及如何以正确的方式做我想做的事? 谢谢。

【问题讨论】:

  • multiprocessing 只能传输pickle-able 对象,而PriorityQueue 不能。在此处查看 unutbu 和 rockportrocker 的答案:stackoverflow.com/questions/8804830/…
  • @jojo 在 Unix 上,PriorityQueue 在这种情况下不会被腌制,它通过fork 继承。但它仍然不起作用,因为PriorityQueue 并不意味着在进程之间共享。 worker 和 parent 最终会得到对象的不同副本,因此一个进程更改它不会影响另一个进程。

标签: python queue multiprocessing priority-queue


【解决方案1】:

问题不在于它在这种情况下不可腌制 - 如果您使用的是类 Unix 平台,则可以将队列传递给子进程而不进行腌制。 (不过,在 Windows 上,我认为您会在这里遇到酸洗错误)。根本问题是您没有使用进程安全队列。唯一可以在进程之间使用的队列是the Queue objects,它位于multiprocessing 模块中。不幸的是,没有可用的PriorityQueue 实现。但是,您可以通过使用 multiprocessing.Manager 类注册 PriorityQueue 轻松创建一个,如下所示:

import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from Queue import PriorityQueue


class MyManager(SyncManager):
    pass
MyManager.register("PriorityQueue", PriorityQueue)  # Register a shared PriorityQueue

def Manager():
    m = MyManager()
    m.start()
    return m

def worker(queue):
    print(queue)
    for i in range(100):
        queue.put(i)
    print "worker", queue.qsize()


m = Manager()
pr_queue = m.PriorityQueue()  # This is process-safe
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()

time.sleep(5)    # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()

输出:

worker 100
main 100

请注意,如果它是标准的multiprocessing.Queue 子类,它的性能可能不会那么好。基于ManagerPriorityQueue 是通过创建一个实际上包含常规PriorityQueueManager 服务器进程来实现的,然后为您的主进程和工作进程提供Proxy 对象,这些对象使用IPC 读取/写入在服务器进程中排队。常规的multiprocessing.Queues 只是向Pipe 写入/读取数据。如果这是一个问题,您可以尝试通过从multiprocessing.Queue 继承或委托来实现您自己的multiprocessing.PriorityQueue。不过,这可能不值得。

【讨论】:

  • 非常感谢。最后,我实现了自己的“优先队列”类,其中包含一些 multiprocessing.Queue。一种分层的优先事项。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-06-30
  • 1970-01-01
  • 2021-06-22
  • 1970-01-01
  • 1970-01-01
  • 2016-08-10
相关资源
最近更新 更多