【问题标题】:In Python, how can I wait until all items in multiple Queues are processed?在 Python 中,如何等到处理多个队列中的所有项目?
【发布时间】:2014-09-16 20:23:55
【问题描述】:

在下面的代码中,我有两个队列用于运行不同类型的线程。这些线程递归地添加到彼此的队列中(队列 1 获取一些信息,队列 2 处理它并将更多信息添加到队列 1)。

我想等到两个队列中的所有项目都处理完毕。目前我正在使用此代码

queue.join()
out_queue.join()

问题是,当第一个队列暂时没有事情可做时,它会关闭,因此它永远不会看到队列 2(out_queue)在那之后添加到它的内容。

我添加了 time.sleep() 函数,这是一个非常糟糕的修复,到 30 年代,两个队列都已填满,不会用完。

解决此问题的标准 Python 方法是什么?我是否必须只有一个队列,并在其中标记项目应由哪个线程处理?

queue = Queue.Queue()
out_queue = Queue.Queue()

class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, queue, out_queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.out_queue = out_queue

    def run(self):
        while True:
            row = self.queue.get()
            
            request = urllib2.Request(row[0], None, req_headers)
            
            # ... some processing ...
            
            self.out_queue.put([row, http_status, page])
            
            self.queue.task_done()

class DatamineThread(threading.Thread):
    def __init__(self, out_queue, mysql):
        threading.Thread.__init__(self)
        self.out_queue = out_queue
        self.mysql = mysql

    def run(self):
        while True:
            row = self.out_queue.get()
            
            # ... some processing ...

            queue.put(newrow)
                        
            self.out_queue.task_done()

queue = Queue.Queue()
out_queue = Queue.Queue()

for i in range(URL_THREAD_COUNT):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()

#populate queue with data
for row in rows:
    queue.put(row)

#MySQL Connector
mysql = MySQLConn(host='localhost', user='root', passwd = None, db='db')

#spawn DatamineThread, if you have multiple, make sure each one has it's own mysql connector
dt = DatamineThread(out_queue, mysql)
dt.setDaemon(True)
dt.start()

time.sleep(30)

#wait on the queue until everything has been processed
queue.join()
out_queue.join()

【问题讨论】:

    标签: python multithreading queue


    【解决方案1】:

    更改工作人员,以便他们需要一个 sentinel 值才能退出,而不是在队列中没有更多工作时退出。在以下代码中,howdy 工作人员从输入队列中读取项目。如果值是哨兵(None,但它可以是任何值),worker 退出。

    因此,您不需要弄乱超时,您发现这可能相当狡猾。另一个后果是,如果您有 N 个线程,则必须将 N 个标记附加到输入队列以杀死您的工人。否则你会遇到一个永远等待的工人。一个僵尸工人,如果你愿意的话。

    来源

    import threading, Queue
    
    def howdy(q):
        for msg in iter(q.get, None):
            print 'howdy,',msg
    
    inq = Queue.Queue()
    for word in 'whiskey syrup bitters'.split():
        inq.put(word)
    inq.put( None )        # tell worker to exit
    
    thread = threading.Thread(target=howdy, args=[inq])
    thread.start()
    thread.join()
    

    输出

    howdy, whiskey
    howdy, syrup
    howdy, bitters
    

    【讨论】:

    • 我如何用 2 个线程来实现这个,每个线程都提供彼此的队列?
    • 替代方案:不是附加 N 个标记,而是让每个退出线程附加另一个标记。
    • 虽然我认为您误解了这个问题,但您的解决方案仅在知道所有工作完成时才有效(两个队列都没有待处理的项目)并且 OP 不知道如何等待那个。
    【解决方案2】:

    假设两个队列分别命名为queue_1queue_2

    • 正确的解决方案:分别跟踪待处理作品的总数 (with a lock), 然后等到值为零(使用条件变量)。

    • 正确的解决方案,但不推荐:使用未记录的 API/内部方法...

      while True:
          with queue_1.mutex, queue_2.mutex:
              if queue_1.unfinished_tasks==0 and queue_2.unfinished_tasks==0:
                  break
          queue_1.join()
          queue_2.join()
      
    • 不正确的解决方案:

      while not (queue_1.empty() and queue_2.empty()):
          queue_1.join()
          queue_2.join()
      

      这是不正确的,因为在queue_2.join 和下一个while 之后检查;并且可能两个队列中都没有项目但任务尚未完成(正在处理一个元素)

      例如,在下面的代码中:

      #!/bin/python
      from threading import Thread
      from queue import Queue
      import time
      
      
      queue_1 = Queue()
      queue_2 = Queue()
      
      def debug(): print(queue_1.qsize(), queue_2.qsize())
      def run_debug():
          while True:
              time.sleep(0.2)
              debug()
      Thread(target=run_debug).start()
      
      def run_1():
          while True:
              value=queue_1.get()
              print("get value", value)
              time.sleep(1)
              if value:
                  print("put value", value-1)
                  queue_2.put(value-1)
              time.sleep(0.5)
              queue_1.task_done()
      
      
      def run_2():
          while True:
              value=queue_2.get()
              print("get value", value)
              time.sleep(1)
              if value:
                  print("put value", value-1)
                  queue_1.put(value-1)
              time.sleep(0.5)
              queue_2.task_done()
      
      
      thread_1 = Thread(target=run_1)
      thread_2 = Thread(target=run_2)
      thread_1.start()
      thread_2.start()
      
      queue_1.put(3)
      
      # wait for both queues
      while not (queue_1.empty() and queue_2.empty()):
          queue_1.join()
          queue_2.join()
      
      print("done")
      # (add code to stop the threads properly)
      

      输出是

      get value 3
      get value 2
      get value 1
      done
      get value 0
      

    【讨论】:

      【解决方案3】:

      我最近尝试做这样的事情并想出了这个。我检查每个队列的大小并继续前进,直到它们全部为空。

      inqueue = True
      while inqueue:  
        time.sleep(5)
        q1 = queue.qsize()
        q2 = out_queue.qsize()
        print("queue:%d,out_queue:%d"% (q1,q2))
        inqueue = q1 or q2
      
      queue.join()
      out_queue.join()
      

      【讨论】:

      • 这个解决方案是错误的,因为队列中可能有一个元素正在被处理(那么两个队列仍然是空的);此外,它可能会比必要的时间多睡 5 秒。
      猜你喜欢
      • 2015-05-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多