【问题标题】:Python multiprocessing queue get() timeout despite full queue尽管队列已满,但 Python 多处理队列 get() 超时
【发布时间】:2017-10-21 17:20:51
【问题描述】:

我正在使用 Python 的多处理模块来进行科学并行处理。在我的代码中,我使用了几个执行繁重工作的工作进程和一个将结果保存到磁盘的写入器进程。要写入的数据通过队列从工作进程发送到写入进程。数据本身相当简单,仅由一个包含文件名的元组和一个带有两个浮点数的列表组成。经过几个小时的处理,编写器进程通常会卡住。更准确地说是下面的代码块

while (True):
    try:
        item = queue.get(timeout=60)
        break
    except Exception as error:
        logging.info("Writer: Timeout occurred {}".format(str(error)))

永远不会退出循环,我会收到连续的“超时”消息。

我还实现了一个日志记录过程,它输出队列的状态等,即使我收到上面的超时错误消息,对 qsize() 的调用也会不断返回一个完整的队列(在我的情况下大小 = 48 )。

我已彻底检查了有关队列对象的文档,但找不到可能的解释为什么 get() 在队列满时返回超时。

有什么想法吗?

编辑:

我修改了代码以确保捕获到空队列异常:

while (True):
    try:
        item = queue.get(timeout=60)
        break
    except Empty as error:
        logging.info("Writer: Timeout occurred {}".format(str(error)))

【问题讨论】:

  • 我也有同样的问题。但是,如果队列仍然满了并且它通常可以工作,我通过在超时后重试 .get() 来回避这个问题,但这并不是真正的解决方案。你找到解决办法了吗?

标签: python multiprocessing python-multiprocessing


【解决方案1】:

在多处理队列中用作同步消息队列。您的问题似乎也是这种情况。然而,这不仅仅需要调用get() 方法。处理完每个任务后,您需要调用 task_done() 以便从队列中删除该元素。

来自文档:

Queue.task_done()

表示以前排队的任务已完成。由队列消费者线程使用。 对于用于获取任务的每个 get(),对 task_done() 的后续调用会告诉队列该任务的处理已完成。

如果 join() 当前处于阻塞状态,它将在处理完所有项目后恢复(意味着对于已将 put() 放入队列的每个项目都收到了 task_done() 调用)。

在文档中,您还可以找到正确使用线程队列的代码示例。

如果你的代码应该是这样的

while (True):
    try:
        item = queue.get(timeout=60)
        if item is None:
            break
        # call working fuction here
        queue.task_done()
    except Exception as error:
        logging.info("Writer: Timeout occurred {}".format(str(error)))

【讨论】:

  • 感谢您的回复 Tomasz,但我怀疑 task_done() 是解决问题的方法。根据文档 task_done() 仅针对我不使用的可连接队列(multiprocessing.JoinableQueue)实现。另外,我在网上找到的几乎所有示例都没有使用 task_done 方法。
  • Queue.get() 方法删除从队列中返回的元素。你可以看到区别python-queue-get-task-done-question这可能是正确的答案,但上面一段的解释是误导性的。
  • 好的,但是我使用 multiprocessing.Queue 而不是 multiprocessing.JoinableQueue 并且只有后者提供 task_done() 方法。我还认为文档清楚地解释了 task_done() 方法用于让队列知道任务何时完成,以便 join() 可以解锁。我不使用的功能。
  • 是的,我承认我通常在线程上下文而不是多处理上下文中使用它。我现在一直在寻找 cpython 源代码,以找到可能导致您遇到的问题的 queue 和 multiprocessing.queue 的确切差异。同样对于正常的Queue.get(),项目在被清除之前不会被删除。 qsize() 仅返回近似大小,不计算正在处理的元素,因此理论上它可以提供空队列,同时由于底层缓冲区已满而仍处于阻塞状态。
  • 至于使qsize() 返回最大大小并仍然阻止get() 调用的行为让我在了解队列的情况下感到困惑。根据文档,当get() 超时失败时,它应该引发Queue.Empty 异常。是你的情况,还是这个不同的错误? (错误在标准队列模块中定义,而不是多处理)
【解决方案2】:

切换到基于管理器的队列应该有助于解决此问题。

manager = Manager()
queue   = manager.Queue()

有关更多详细信息,您可以在此处查看多处理文档:https://docs.python.org/2/library/multiprocessing.html

【讨论】:

    【解决方案3】:

    您发现了一个过于笼统的Exception 并假设它是一个超时错误。

    尝试修改逻辑如下:

    from Queue import Empty
    
    while (True):
        try:
            item = queue.get(timeout=60)
            break
        except Empty as error:
            logging.info("Writer: Timeout occurred {}".format(str(error)))
            print(queue.qsize())
    

    并查看日志是否仍然打印。

    【讨论】:

    • 好的,我承认我不能确定它是超时错误。我现在将重新运行它,但 Queue.Empty 错误除外。但是,如果我查看我的日志文件,我会每 60 秒收到“写入器:发生超时..”消息,这符合指定的超时期限。
    • noxdafox,我相应地修改了代码。我肯定会抛出一个空队列错误。它每 60 秒发生一次,同时记录器进程指示已满队列。
    • 我稍微修改了代码,尝试在异常之后添加该行并验证队列中确实有元素。还有一件事:队列中的那些元素有多大? KB?兆?千兆?
    • 队列中元素的大小实际上很小。它们具有以下结构:(filename, [float, float]),其中文件名通常不超过 20 个字符。我总共处理了大约 170000 个项目,队列通常在 50000 到 80000 个元素被发送到输出队列后卡住。好的,我将添加该行,尽管我的记录器进程不断监控队列大小并声称它仍然是满的。
    • 刚刚获得了最新运行的日志。我同时得到一个“写入器:发生超时”和一个“写入器:队列大小 48”。不知道该怎么做,但我认为这是 Queue 实现内部的一个问题,尽管我认为这有点奇怪,以前没有人遇到过这个问题。
    猜你喜欢
    • 2015-10-11
    • 1970-01-01
    • 2019-12-25
    • 1970-01-01
    • 2021-04-29
    • 2012-01-17
    • 2011-06-15
    • 2018-06-27
    • 2013-06-18
    相关资源
    最近更新 更多