【问题标题】:Multiprocesing pool.join() hangs under some circumstancesMultiprocessing pool.join() 在某些情况下挂起
【发布时间】:2016-02-23 23:10:28
【问题描述】:

我正在尝试使用multiprocessing 在 Python 中创建一个简单的生产者/消费者模式。它有效,但它挂在poll.join()

from multiprocessing import Pool, Queue

que = Queue()


def consume():
    while True:
        element = que.get()
        if element is None:
            print('break')
            break
    print('Consumer closing')


def produce(nr):
    que.put([nr] * 1000000)
    print('Producer {} closing'.format(nr))


def main():
    p = Pool(5)
    p.apply_async(consume)
    p.map(produce, range(5))
    que.put(None)
    print('None')
    p.close()
    p.join()


if __name__ == '__main__':
    main()

示例输出:

~/Python/Examples $ ./multip_prod_cons.py 
Producer 1 closing
Producer 3 closing
Producer 0 closing
Producer 2 closing
Producer 4 closing
None
break
Consumer closing

然而,当我改变一行时,它可以完美地工作:

que.put([nr] * 100)

在运行 Python 3.4.3 或 Python 2.7.10 的 Linux 系统上可 100% 重现。我错过了什么吗?

【问题讨论】:

    标签: python multiprocessing producer-consumer pool


    【解决方案1】:

    这里有很多混乱。您正在编写的不是生产者/消费者场景,而是滥用另一种通常称为“工人池”的模式的混乱。

    工人池模式是生产者/消费者的一种应用,其中有一个生产者安排工作,许多消费者使用它。在这种模式下,Pool 的所有者最终成为生产者,而工人将成为消费者。

    在您的示例中,您有一个混合解决方案,其中一名工人最终成为消费者,而其他工人则充当某种中间件。整个设计效率很低,重复了Pool 已经提供的大部分逻辑,更重要的是,非常容易出错。您最终遭受的痛苦是Deadlock

    将对象放入multiprocessing.Queue 是一个异步操作。仅当 Queue 已满且您的 Queue 具有无限大小时,它才会阻塞。

    这意味着您的produce 函数会立即返回,因此对p.map 的调用不会像您期望的那样阻塞。相反,相关的工作进程等待实际消息通过PipeQueue 将其用作通信通道。

    接下来发生的事情是,当您输入Queue None“消息”时,您提前终止了您的消费者,该消息在您的produce 函数创建的所有列表被正确推送到Pipe 之前传递。

    拨打p.join 后您会注意到问题,但实际情况如下。

    • p.join 调用正在等待所有工作进程终止。
    • 工作进程正在等待大列表通过QueuePipe
    • 由于消费者工人早已不复存在,没有人会耗尽显然已满的Pipe

    在您将终止消息实际发送到 consume 函数之前,该问题不会显示您的列表是否足够小。

    【讨论】:

    • 非常感谢您的回答。我同意这里的大部分观点,但我很好奇。在真正的应用程序中,生产者在将结果推送到队列之前进行了大量计算密集型工作,消费者将结果存储在数据库中。您能否再解释一下“整个设计效率非常低,重复了 Pool 已经提供的大部分逻辑”?
    • Pool 已经创建了一个内部Queue,用于将任务分派给工人。 Workers 只是在内部 Queue 上等待并在安排新任务后调用给定函数的简单进程。在您的示例中,您正在使用另一个由其中一个工人控制的 Queue 将工作分派给其他 4 个。您实际上是在完成整个工作两次 :)
    • 正确的设计会让您的工人完成计算密集型工作并将结果返回给调用者,然后调用者将结果存储在数据库中。您的父进程将充当调度程序和结果管理器。如果由于 I/O 繁重而导致数据库存储操作成为问题,您可以将结果传递给线程的 Pool,从而允许您在服务的 CPU 和 I/O 方面进行扩展。
    • 谢谢!我会调查的。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-12-19
    • 2018-11-28
    • 2011-09-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多