【问题标题】:multiprocessing.Queue : Broken pipe errormultiprocessing.Queue:管道损坏错误
【发布时间】:2017-10-24 03:15:40
【问题描述】:

注1: 我想在多进程中使用 mutiprocessing.Queue,当我发现这个问题发生在单进程情况下。所以,下面的代码是使用单一进程来简化问题。

有一个类似的问题:Broken pipe error with multiprocessing.Queue

该帖子中的答案证明此问题是因为主线程在队列线程完成其工作之前退出。他修复它的方法是在他的代码中添加sleep(0.1)

import multiprocessing
import time
def main():
    q = multiprocessing.Queue()
    for i in range(10):
        print i
        q.put(i)
    time.sleep(0.1) # Just enough to let the Queue finish

if __name__ == "__main__":
    main()

但是,我认为 sleep 对于生产代码来说不是一种稳定的方法,所以我尝试使用join 来做到这一点。您可以在下面看到我的代码,但不幸的是,它不起作用。有没有人知道如何在不睡觉的情况下做到这一点?

import multiprocessing
import time


def main():
    q = multiprocessing.Queue()
    for i in range(10):
        q.put(i)
    # time.sleep(4)
    q.close()
    q.join_thread()

if __name__ == "__main__":
    main() 

【问题讨论】:

  • 你想做什么?在一个进程中使用 multiprocessing.Queue?
  • 请查看我编辑的问题。简而言之,我想将它与多进程一起使用,但是当我提出这个问题时,我选择用最少的代码简化问题。
  • 你几乎不会在多进程中遇到这个问题。因为您将拥有一个可以阻塞主进程的消费者进程。
  • 是的,我用多进程试试,效果很好。但我仍然很好奇为什么这会发生在单个进程中。这个问题的来源(我项目中的代码)有问题,即使是多处理,我也会尝试给出另一个简化的代码。

标签: python multithreading


【解决方案1】:

让我们先描述multiprocessing.Queue的一些细节。

当一个对象被放入队列时,该对象被腌制,然后后台线程将腌制的数据刷新到底层管道

管道是通过reader, writer = socket.socketpair()创建的。

queue.close() 是为多进程设计的,它做了两件事

  1. 关闭reader(重要!)
  2. 向后台线程queue.buffer发送一个sentinel值 遇到这样的值会退出

在单进程情况下,queue.close()不起作用的原因是步骤1,如果buffer中还有数据,后台线程会继续往一个已经关闭套接字,导致Broken pipe 错误。

一个简单的例子来演示错误

import socket

reader, writer = socket.socketpair()
writer.send("1")

# queue.close() will internally call reader.close()
reader.close()

# got a Broken pipe error
writer.send("2")

在多进程情况下,在主进程中关闭reader只是减少底层套接字的引用计数(主进程和子进程共享套接字),并没有真正关闭(或关闭)套接字。

【讨论】:

    【解决方案2】:

    程序:

    import multiprocessing
    
    def main():
        q = multiprocessing.Queue()
        q.put(0)
    
    if __name__ == '__main__':
        main()
    

    输出:

    Traceback (most recent call last):
      File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 251, in _feed
        send_bytes(obj)
      File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 205, in send_bytes 
        self._send_bytes(m[offset:offset + size])
      File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes 
        self._send(header + buf)
      File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 373, in _send
        n = write(self._handle, buf)
    BrokenPipeError: [Errno 32] Broken pipe
    

    multiprocessing.Queue 的队列线程仍将入队的项目发送到队列管道的写入端队列管道的读取端已经是@987654321 之后,将引发BrokenPipeError 异常@跟随队列的垃圾回收(队列管道的写端没有被垃圾回收,因为它也被队列线程引用)。

    我认为这是一个错误,所以我在 GitHub 上打开了pull request

    一种解决方法是确保队列被垃圾回收时队​​列线程不会发送任何已入队的项目,方法是先将所有已入队的项目出队:

    import multiprocessing
    
    def main():
        q = multiprocessing.Queue()
        q.put(0)
        q.get()
    
    if __name__ == '__main__':
        main()
    

    【讨论】:

      猜你喜欢
      • 2016-07-21
      • 1970-01-01
      • 2014-04-28
      • 2011-02-12
      • 1970-01-01
      • 2015-07-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多