【问题标题】:Python multiprocessing queue: what to do when the receiving process quits?Python多处理队列:接收进程退出时该怎么办?
【发布时间】:2012-05-15 19:32:36
【问题描述】:

基本上我有以下代码:

import multiprocessing
import time

class MyProcess(multiprocessing.Process):

    def __init__(self, ):
        multiprocessing.Process.__init__(self)
        self.queue = multiprocessing.Queue()

    def run(self):
        print "Subprocess starting!"
        time.sleep(4)
        print "Subprocess exiting!"

    def addToQueue(self):
        starttime = time.time()
        count=0
        print "Adding stuff to queue..."
        while time.time()-starttime  < 4:
            self.queue.put("string")
            count += 1
        print "Added %d objects!" % count

        #self.queue.close()


if __name__ == "__main__":
    process = MyProcess()
    process.start()
    print "Waiting for a while"
    time.sleep(2)
    process.addToQueue()
    time.sleep(1)
    print "Child process state: %d" % process.is_alive()

当主进程完成时,它不会退出。什么都没有发生,它只是阻塞。我发现退出的唯一方法是杀死它(不是 SIGTERM,SIGKILL)。

如果我使用该注释行,它会退出但会发出 IOError。

我查看了 multiprocessing.queue 的代码,它使用了在另一个线程 (threading.Thread) 中生成的 os.pipe()。我怀疑是线程在写入管道时阻塞,当使用 close() 方法时,它会引发 IOError。

所以我的问题是:有没有更清洁的方法来处理这个问题?

我的意思是,我有这种情况,其中一个队列不断被写入。当接收进程退出(干净与否)时,我应该关闭队列并在发送者进程上得到一个 IOError 吗?

编辑:过程的输出

Waiting for a while
Subprocess starting!
Adding stuff to queue...
Subprocess exiting!
Added 1822174 objects!
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
IOError: [Errno 32] Broken pipe
Child process state: 0

这部分仅在使用注释的 self.queue.close() 时出现:

Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
IOError: [Errno 32] Broken pipe

【问题讨论】:

  • 你得到什么输出?错误的回溯是什么?
  • 如果您只想将工作外包给其他进程,请考虑使用 multiprocessing.Pool。它会为您处理低级的家务。
  • 对不起,应该贴出来的!立即发布!
  • 你真的没有从你的子进程的队列中拉出任何东西,还是只是没有在示例代码中? 180 万个对象可能会堵塞管道的缓冲区,Queue 在进程之间进行通信。
  • 我制作这个示例只是为了用更简单的代码展示行为,在我的应用程序中,队列不断被消耗,尽管它可能会增长很多。

标签: python queue multiprocessing


【解决方案1】:

我正在回答我自己的问题,因为不是每个人都阅读 cmets。在 cmets 中用户 mata 的提示之后,我测试了问题中的示例代码,在将对象添加到队列的循环内添加了对 time.sleep(0.01) 的调用,因此我可以限制将添加到队列中的对象数量:

def addToQueue(self):
        starttime = time.time()
        count=0
        print "Adding stuff to queue..."
        while time.time()-starttime  < 4:
            self.queue.put("string")
            count += 1
            time.sleep(0.01)
        print "Added %d objects!" % count

因此,当对象数量较少(本例中少于 3800)时,进程会正常退出。但是当有很多对象时,进程之间的管道中似乎存在一些锁定。

但这给我带来了另一个问题:这是一个错误吗?我应该报告吗?还是这只是正常的预期行为?

非常感谢用户 mata 指出这种可能性!

【讨论】:

    猜你喜欢
    • 2018-06-20
    • 2013-11-23
    • 2015-04-15
    • 2022-11-17
    • 2022-01-12
    • 2020-03-04
    • 2015-10-17
    • 1970-01-01
    • 2023-03-13
    相关资源
    最近更新 更多