【问题标题】:Python multiprocessing Queues reliability, Queue vs SimpleQueue vs JoinableQueuePython 多处理队列可靠性,Queue vs SimpleQueue vs JoinableQueue
【发布时间】:2013-01-23 03:34:58
【问题描述】:

直接来自 Python docs:

类 multiprocessing.Queue([maxsize])

...

qsize() 返回队列的大致大小。由于多线程/多处理语义,这个数字是不可靠的。

空() 如果队列为空,则返回 True,否则返回 False。由于多线程/多处理语义,这是不可靠的。

我凭经验发现这对于Queue 来说是完全正确的,尤其是对于empty()

在我的代码中,我有一堆进程(每个进程都是同一个主进程的子进程),每个进程的 run 方法中都有以下内容:

while self.active:
    if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0):
        try:
            self.exclusive_queue.put(self.general_queue.get(timeout=self.queue_timeout))
        except Queue.Empty as empty_queue:
            continue
    else:
        task = self.exclusive_queue.get()
        self.compute(task)

基本上,该进程等待general_queue 工作,但首先检查它的exclusive_queue。主进程可以将任务放入进程的通用队列或独占队列中。现在,在if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0) 中,我首先使用了self.exclusive_queue.empty(),这导致了相当奇怪的行为(qsize() 30+ 和empty() = True)。

所以我要说的是 - 文档中的 multiprocessing.queues.SimpleQueue 是这样写的:

空() 如果队列为空,则返回 True,否则返回 False。

根本没有提到可靠性。 SimpleQueue.empty() 可靠吗?

第二个是multiprocessing.JoinableQueue 可靠还是比Queue“更”可靠,因为task_done() 机制?

这种方法可以被认为是正确的,还是带有回调的方法(通过子级之间的共享管道端点)更合适?

【问题讨论】:

  • 在我的情况下 SimpleQueue 只能存储太少数量的元素 [大约 360 (int, str, int)-tuples)]。出于这个原因,我决定在队列中使用STOP 元素(如Midnighters 答案中所述)。对于队列,即使在相同结构的 100 000 个元素之后,我也没有达到大小限制。

标签: python process multiprocessing


【解决方案1】:

不是一个直接的答案,但我已经开始越来越依赖于使用保护条件迭代输入队列。多处理模块的文档中有一个例子:

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

因此,当您对队列的输入完成后,您只需将 put 与您启动进程时一样多的 STOP 字符串或您选择的任何保护器添加到队列中。

【讨论】:

  • 是的,我几乎在我的服务器中使用相同的方法。如果compute 得到None,它会将active 更改为False。但是,正如您在我的情况下看到的那样,我有两个队列要等待 - “独占”和“通用”队列,其想法是“如果独占为空,则从通用获取”,因为我希望能够停止即使常规队列中有条目,我也会将Nones 放在独占队列中。
猜你喜欢
  • 2011-09-14
  • 2014-08-13
  • 1970-01-01
  • 1970-01-01
  • 2015-09-22
  • 2017-01-02
  • 1970-01-01
  • 2011-01-15
  • 2020-09-25
相关资源
最近更新 更多