【问题标题】:Broken pipe error with multiprocessing.Queuemultiprocessing.Queue 的管道损坏错误
【发布时间】:2016-07-21 10:46:41
【问题描述】:

在 python2.7 中,multiprocessing.Queue 在从函数内部初始化时会引发错误。我提供了一个重现问题的最小示例。

#!/usr/bin/python
# -*- coding: utf-8 -*-

import multiprocessing

def main():
    q = multiprocessing.Queue()
    for i in range(10):
        q.put(i)

if __name__ == "__main__":
    main()

抛出下面的断管错误

Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
IOError: [Errno 32] Broken pipe

Process finished with exit code 0

我无法解释原因。我们不能从函数内部填充 Queue 对象肯定会很奇怪。

【问题讨论】:

    标签: python python-2.7 parallel-processing multiprocessing


    【解决方案1】:

    这里发生的是,当您调用 main() 时,它会创建 Queue,将 10 个对象放入其中并结束函数,垃圾收集其所有内部变量和对象,包括 Queue。 但是您收到此错误是因为您仍在尝试发送Queue 中的最后一个号码。

    来自文档documentation

    "当一个进程第一次将一个项目放入队列时,一个 feeder 线程是 开始将对象从缓冲区传输到管道中。”

    由于put()是在另一个线程中创建的,它不会阻塞脚本的执行,并允许在完成队列操作之前结束main()函数。

    试试这个:

    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    
    import multiprocessing
    import time
    def main():
        q = multiprocessing.Queue()
        for i in range(10):
            print i
            q.put(i)
        time.sleep(0.1) # Just enough to let the Queue finish
    
    if __name__ == "__main__":
        main()
    

    应该有一种方法可以join Queue 或阻止执行,直到将对象放入Queue,您应该查看文档。

    【讨论】:

    • 精彩的答案。我将给出另一个说明,在 python3 中不会发生这种情况。
    【解决方案2】:

    按照@HarryPotFleur 的建议延迟使用time.sleep(0.1),这个问题就解决了。但是,我用 python3 测试了代码,并且在 python3 中根本没有发生断管问题。我认为这是作为错误报告的,后来得到了修复。

    【讨论】:

    • 不是真的,它根本不会发生在 python3 中。更何况time.sleep(0.1) 没有解决!只是为了理解!
    • 使用睡眠不是一个解决方案,它是一个 hacky 解决方法。这确实发生在python3中。 bugs.python.org/issue35844
    【解决方案3】:

    当您启动 Queue.put() 时,会启动隐式线程将数据传递到队列。同时,主应用程序结束,数据没有终点站(队列对象被垃圾回收)。

    我会试试这个:

    from multiprocessing import Queue
    
    def main():
        q = Queue()
        for i in range(10):
            print i
            q.put(i)
        q.close()
        q.join_thread()
    
    if __name__ == "__main__":
        main()
    

    join_thread() 确保缓冲区中的所有数据都已刷新。 close() 必须在 join_thread() 之前调用

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-04-28
      • 2011-02-12
      • 2015-07-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多