【问题标题】:Sharing a result queue among several processes在多个进程之间共享结果队列
【发布时间】:2012-04-12 02:53:11
【问题描述】:

multiprocessing 模块的文档展示了如何将队列传递给以multiprocessing.Process 启动的进程。但是如何与以apply_async 开头的异步工作进程共享队列?我不需要动态加入或其他任何东西,只是让工作人员(反复)将他们的结果报告回基地的一种方式。

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    q = multiprocessing.Queue()
    workers = pool.apply_async(worker, (33, q))

这失败了: RuntimeError: Queue objects should only be shared between processes through inheritance。 我理解这意味着什么,并且我理解继承而不是要求酸洗/解酸(以及所有特殊的 Windows 限制)的建议。但是如何我如何以一种有效的方式传递队列?我找不到一个例子,我尝试了几种以各种方式失败的替代方案。请帮忙?

【问题讨论】:

    标签: python parallel-processing queue multiprocessing python-multiprocessing


    【解决方案1】:

    multiprocessing.Pool 已经有一个共享的结果队列,不需要额外涉及一个Manager.QueueManager.Queue 是底层的 queue.Queue(多线程队列),位于单独的服务器进程上并通过代理公开。与 Pool 的内部队列相比,这增加了额外的开销。与依赖 Pool 的原生结果处理相反,Manager.Queue 中的结果也不能保证是有序的。

    工作进程不是.apply_async() 启动的,这在您实例化Pool 时已经发生。什么开始的 当你打电话给pool.apply_async() 是一个新的“工作”。 Pool 的工作进程在后台运行multiprocessing.pool.worker-function。该函数负责处理通过池的内部Pool._inqueue 传输的新“任务”,并通过Pool._outqueue 将结果发送回父级。您指定的func 将在multiprocessing.pool.worker 内执行。 func 只需 return 一些东西,结果将自动发送回父级。

    .apply_async() 立即(异步)返回一个AsyncResult 对象(ApplyResult 的别名)。您需要在该对象上调用.get()(正在阻塞)以接收实际结果。另一种选择是注册一个callback 函数,该函数会在结果就绪后立即触发。

    from multiprocessing import Pool
    
    def busy_foo(i):
        """Dummy function simulating cpu-bound work."""
        for _ in range(int(10e6)):  # do stuff
            pass
        return i
    
    if __name__ == '__main__':
    
        with Pool(4) as pool:
            print(pool._outqueue)  # DEMO
            results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
            # `.apply_async()` immediately returns AsyncResult (ApplyResult) object
            print(results[0])  # DEMO
            results = [res.get() for res in results]
            print(f'result: {results}')       
    

    示例输出:

    <multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
    <multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
    result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    

    注意:为.get() 指定timeout 参数不会停止工作进程中任务的实际处理,它只会通过引发multiprocessing.TimeoutError 来解除等待的父进程。

    【讨论】:

    • 有趣,有机会我会尝试一下。 2012 年肯定不是这样。
    • @alexis Python 2.7 (2010) 在这里只缺少上下文管理器和error_callback 参数apply_async,所以它没有太大变化。
    • 我发现回调函数是最有用的,尤其是当与部分函数结合使用时,允许使用常规列表收集异步结果,如此处所述; gist.github.com/Glench/5789879
    【解决方案2】:

    尝试使用multiprocessing.Manager 管理您的队列并使其可供不同的工作人员访问。

    import multiprocessing
    def worker(name, que):
        que.put("%d is done" % name)
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(processes=3)
        m = multiprocessing.Manager()
        q = m.Queue()
        workers = pool.apply_async(worker, (33, q))
    

    【讨论】:

    • 成功了,谢谢!我的原始代码中的异步调用存在不相关的问题,因此我也将修复复制到您的答案中。
    • 任何解释为什么queue.Queue()不适合这个?
    • @mrgloom: queue.Queue 是为线程而构建的,使用内存锁。在多进程环境中,每个子进程都会在自己的内存空间中获得自己的 queue.Queue() 实例副本,因为子进程(大多数情况下)不共享内​​存。
    • @alexis 多个worker插入数据后如何从Manager().Queue()中获取元素?
    猜你喜欢
    • 1970-01-01
    • 2018-06-25
    • 2012-07-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-01-22
    • 1970-01-01
    • 2021-06-15
    相关资源
    最近更新 更多