【问题标题】:Queue vs JoinableQueue in PythonPython中的队列与JoinableQueue
【发布时间】:2015-09-22 16:17:17
【问题描述】:

在 Python 中使用多处理模块时有两种队列:

  • 队列
  • JoinableQueue.

它们有什么区别?

队列

from multiprocessing import Queue
q = Queue()
q.put(item) # Put an item on the queue
item = q.get() # Get an item from the queue

可加入队列

from multiprocessing import JoinableQueue
q = JoinableQueue()
q.task_done() # Signal task completion
q.join() # Wait for completion

【问题讨论】:

    标签: python queue multiprocessing


    【解决方案1】:

    根据文档,很难确定 Queue 实际上是空的。使用JoinableQueue,您可以通过调用q.join() 等待队列清空。如果您想分批完成工作,在每批结束时做一些离散的事情,这可能会有所帮助。

    例如,您可能一次通过队列处理 1000 个项目,然后向用户发送您已完成另一批次的推送通知。使用普通的Queue 来实现这将是一项挑战。

    它可能看起来像:

    import multiprocessing as mp
    
    BATCH_SIZE = 1000
    STOP_VALUE = 'STOP'
    
    def consume(q):
      for item in iter(q.get, STOP_VALUE):
        try:
          process(item)
        # Be very defensive about errors since they can corrupt pipes.
        except Exception as e:
          logger.error(e)
        finally:
          q.task_done()
    
    q = mp.JoinableQueue()
    with mp.Pool() as pool:
      # Pull items off queue as fast as we can whenever they're ready.
      for _ in range(mp.cpu_count()):
        pool.apply_async(consume, q)
      for i in range(0, len(URLS), BATCH_SIZE):
        # Put `BATCH_SIZE` items in queue asynchronously.
        pool.map_async(expensive_func, URLS[i:i+BATCH_SIZE], callback=q.put)
        # Wait for the queue to empty.
        q.join()
        notify_users()
      # Stop the consumers so we can exit cleanly.
      for _ in range(mp.cpu_count()):
        q.put(STOP_VALUE)
    

    注意:我实际上并没有运行此代码。如果您从队列中取出物品的速度比放入物品的速度快,那么您可能会提前完成。在这种情况下,此代码至少每 1000 个项目发送一次更新,而且可能更频繁。对于进度更新,这可能没问题。如果精确到 1000 很重要,您可以使用 mp.Value('i', 0) 并在您的 join 发布时检查它是否为 1000。

    【讨论】:

      【解决方案2】:

      JoinableQueue 有方法 join()task_done(),而 Queue 没有。


      类 multiprocessing.Queue( [maxsize] )

      返回使用管道和一些锁/信号量实现的进程共享队列。当一个进程第一次将一个项目放入队列时,会启动一个馈送线程,它将对象从缓冲区传输到管道中。

      标准库的 Queue 模块中通常的 Queue.Empty 和 Queue.Full 异常会引发超时信号。

      Queue实现了Queue.Queue的所有方法,除了task_done()和join()。


      类 multiprocessing.JoinableQueue( [maxsize] )

      JoinableQueue 是 Queue 的一个子类,是一个另外有 task_done() 和 join() 方法的队列。

      task_done()

      表示以前排队的任务已完成。由队列消费者线程使用。对于用于获取任务的每个 get(),对 task_done() 的后续调用会告诉队列该任务的处理已完成。

      如果 join() 当前处于阻塞状态,它将在处理完所有项目后恢复(意味着对于已将 put() 放入队列的每个项目都收到了 task_done() 调用)。

      如果调用次数超过队列中放置的项目数,则引发 ValueError。

      加入()

      阻塞直到队列中的所有项目都被获取并处理完毕。

      每当将项目添加到队列中时,未完成任务的计数就会增加。每当消费者线程调用 task_done() 以指示该项目已被检索并且所有工作都已完成时,计数就会下降。当未完成任务的计数降至零时,join() 会解除阻塞。


      如果您使用JoinableQueue,那么您必须为从队列中删除的每个任务调用JoinableQueue.task_done(),否则用于计算未完成任务数量的信号量最终可能会溢出,从而引发异常。

      【讨论】:

      • 你知道溢出值是什么,或者如何找到它吗?文档没有说。
      • 除了引用文档 - 这个答案带来了什么?一个例子和一些解释会很好
      • 请您添加示例代码。这将改善答案。目前它对我帮助不大。对不起。
      猜你喜欢
      • 2023-04-09
      • 2013-01-23
      • 2021-12-14
      • 2014-08-13
      • 1970-01-01
      • 2016-05-24
      • 1970-01-01
      • 2013-10-11
      • 2018-01-09
      相关资源
      最近更新 更多