【问题标题】:Subprocess completes but still doesn't terminate, causing deadlock子进程完成但仍未终止,导致死锁
【发布时间】:2011-12-22 23:41:03
【问题描述】:

好的,因为目前没有答案,所以我不会觉得这样做太糟糕了。 虽然我仍然对导致此问题的幕后实际发生的事情感兴趣,但我最紧迫的问题是更新 2 中指定的问题。那些是,

JoinableQueueManager().Queue() 之间有什么区别(什么时候应该使用其中一个?)。重要的是,在这个例子中,用一个替换另一个是否安全?


在下面的代码中,我有一个简单的进程池。每个进程都通过进程队列 (pq) 来提取要处理的数据,并通过返回值队列 (rq) 将处理的返回值传递回主线程。如果我不附加到返回值队列它可以工作,但是一旦我这样做,由于某种原因进程被阻止停止。在这两种情况下,进程run 方法都会返回,因此在返回队列阻塞时不是put,但在第二种情况下,进程本身不会终止,因此当我在进程上join 时程序会死锁。为什么会这样?

更新:

  1. 似乎与队列中的项目数量有关。

    至少在我的机器上,队列中最多可以有 6570 个项目,它确实可以工作,但除此之外,它就会死锁。

  2. 它似乎适用于 Manager().Queue()

    无论是 JoinableQueue 的限制还是只是我误解了这两个对象之间的差异,我已经发现如果我用Manager().Queue() 替换返回队列,它会按预期工作。它们之间有什么区别,什么时候应该使用一个而不是另一个?

  3. 如果我从 rq 消费,则不会发生错误

    哎呀。这里有片刻的答案,当我评论它时,它消失了。无论如何,它所说的其中一件事是质疑如果我添加消费者,这个错误是否仍然会发生。我试过了,答案是,不行。

    它提到的另一件事是来自the multiprocessing docs 的这句话可能是问题的关键。提到JoinableQueue,它说:

    ...用于计算未完成任务数量的信号量可能 最终溢出引发异常。


import multiprocessing

class _ProcSTOP:
    pass

class Proc(multiprocessing.Process):

    def __init__(self, pq, rq):
        self._pq = pq
        self._rq = rq
        super().__init__()
        print('++', self.name)

    def run(self):
        dat = self._pq.get()

        while not dat is _ProcSTOP:
#            self._rq.put(dat)        # uncomment me for deadlock
            self._pq.task_done()
            dat = self._pq.get()

        self._pq.task_done() 
        print('==', self.name)

    def __del__(self):
        print('--', self.name)

if __name__ == '__main__':

    pq = multiprocessing.JoinableQueue()
    rq = multiprocessing.JoinableQueue()
    pool = []

    for i in range(4):
        p = Proc(pq, rq) 
        p.start()
        pool.append(p)

    for i in range(10000):
        pq.put(i)

    pq.join()

    for i in range(4):
       pq.put(_ProcSTOP)

    pq.join()

    while len(pool) > 0:
        print('??', pool)
        pool.pop().join()    # hangs here (if using rq)

    print('** complete')

示例输出,不使用返回队列:

++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-4
== Proc-3
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-2
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>, <Proc(Proc-3, stopped)>]
-- Proc-3
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>]
-- Proc-2
?? [<Proc(Proc-1, stopped)>]
-- Proc-1
** complete
-- Proc-4

示例输出,使用返回队列:

++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-2
== Proc-4
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-3
# here it hangs

【问题讨论】:

  • 可能相关:bugs.python.org/issue8237
  • @J.F.塞巴斯蒂安。可能是这样,但这似乎是说它阻塞了put,我的所有run's return`在块之前和put 只发生在run 内所以我的put 不能被阻止。

标签: python debugging python-3.x deadlock multiprocessing


【解决方案1】:

来自documentation

警告

如上所述,如果子进程已将项目放入队列(并且它没有使用 JoinableQueue.cancel_join_thread()),那么该进程将不会终止,直到所有缓冲的项目都被刷新到管道中。

这意味着如果您尝试加入该进程,您可能会遇到死锁,除非您确定已放入队列的所有项目都已被消耗。同样,如果子进程是非守护进程,则父进程在尝试加入其所有非守护子进程时可能会在退出时挂起。

请注意,使用管理器创建的队列不存在此问题。请参阅编程指南。

所以 JoinableQueue() 使用管道并且会等到它可以在关闭之前刷新所有数据。

另一方面,Manager.Queue() 对象使用完全不同的方法。 管理器正在运行一个单独的进程,该进程会立即接收所有数据(并将其存储在其内存中)。

管理器提供了一种方法来创建可以在不同进程之间共享的数据。管理器对象控制管理共享对象的服务器进程。其他进程可以通过代理访问共享对象。

...

队列([最大尺寸]) 创建一个共享的 Queue.Queue 对象并为其返回一个代理。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2011-04-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-09-19
    • 2017-07-27
    • 2012-04-04
    • 2012-02-24
    相关资源
    最近更新 更多