【问题标题】:Two queues: the script doesn't exit两个队列:脚本不退出
【发布时间】:2013-12-30 13:08:47
【问题描述】:

我编写了一个脚本,它使用了 2 个队列和 3 种类型的 worker:生产者、消费者(CPU 密集型任务)、编写者(我需要按顺序写入结果)。

这是我的代码的简化版本:

from queue import Queue
from threading import Thread

def compute_single_score(data):
    #do lots of calculations
    return 0.0

def producer(out_q, data_to_compute):
    while stuff:
        data = data_to_compute.popitem()
        out_q.put(data)
    out_q.put(_sentinel)

def consumer(in_q, out_q):
    while True:
        data = in_q.get()
        if data is _sentinel:
            in_q.put(_sentinel)
            break
        out_q.put([data[0], compute_single_score(*data)])
        in_q.task_done()

def writer(in_q):
    while True:
        data = in_q.get()
        if data is _sentinel:
            in_q.put(_sentinel)
            break
        in_q.task_done()

if __name__ == '__main__':
    _sentinel = object()
    jobs_queue = Queue()
    scores_queue = Queue()

    t1 = Thread(target=producer, args=(jobs_queue, data_to_compute,))
    t2 = Thread(target=consumer, args=(jobs_queue,scores_queue,))
    t3 = Thread(target=consumer, args=(jobs_queue,scores_queue,))
    t4 = Thread(target=consumer, args=(jobs_queue,scores_queue,))
    t5 = Thread(target=consumer, args=(jobs_queue,scores_queue,))
    t6 = Thread(target=consumer, args=(jobs_queue,scores_queue,))
    t7 = Thread(target=consumer, args=(jobs_queue,scores_queue,))
    t8 = Thread(target=consumer, args=(jobs_queue,scores_queue,))
    t9 = Thread(target=writer, args=(scores_queue,))

    t1.start(); t2.start(); t3.start(); t4.start(); t5.start(); t6.start(); t7.start(); t8.start(); t9.start()

    jobs_queue.join()
    scores_queue.join()
    print('File written')

它会立即打印出“已写入文件”,而不是等待队列为空。因此,尽管执行了所有计算,但脚本不会退出。两个线程似乎保持活跃。

非常感谢您的支持。

【问题讨论】:

    标签: multithreading python-3.x queue


    【解决方案1】:

    它确实等待队列为空。但是由于将事物放入队列中发生在线程中,因此它到达.join() 行的速度比.put() 发生得更快。所以当它到达.join() 时,队列是空的。

    现在我不确定你想要实现什么,仅仅是因为生产者有一个while stuff 循环。我假设您要继续处理,直到此条件成立。特别是你必须等到t1 线程退出,即

    t1.start(); t2.start(); t3.start(); t4.start(); t5.start(); t6.start(); t7.start(); t8.start(); t9.start()
    
    t1.join() # <-- this is important
    jobs_queue.join()
    scores_queue.join()
    print('File written')
    

    否则无法同步。

    旁注 1:由于 GIL,创建 CPU 绑定线程没有意义。如果您的线程没有执行任何 IO(并且它们没有),那么它在单线程时会执行得更好。好吧,至少多个consumer 线程是没有意义的。

    旁注 2:不要使用逗号。这不是蟒蛇。而是这样做:

    threads = []
    threads.append(Thread(target=producer, args=(jobs_queue, data_to_compute,)))
    threads.append(Thread(target=writer, args=(scores_queue,)))
    for i in range(10):
        threads.append(Thread(target=consumer, args=(jobs_queue,scores_queue,)))
    
    for t in threads:
        t.start()
    
    threads[0].join()
    

    旁注 3:您应该处理队列为空的情况。 data = in_q.get() 将永远阻塞,这意味着您的脚本不会退出(除非线程被标记为 daemon)。例如,您应该这样做:

    try:
        data = in_q.get(timeout=1)
    except queue.Empty:
        # handle empty queue here, perhaps quit if t1 is not alive
        # otherwise just continue the loop
    
        if not t1.is_alive(): # <-- you have to pass t1 to the thread
            break
        else:
            continue
    

    然后在主线程的末尾加入所有线程(见旁注2):

    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print('File written')
    

    现在您甚至不必加入队列。

    【讨论】:

    • 你有什么建议来解决(旁注1)?流程?在这种情况下,我该如何修改脚本?非常感谢您的帮助,非常感谢。
    • @michele 如果您正在做一些 cpu 繁重的工作,那么我建议您首先不要使用 Python。您可以使用 C 插件,但拥有单独的 C/C++ 服务可能会更好。如果你真的必须坚持使用 Python,那么看看 multiprocessing 模块。产生更多进程是在 Python 中利用多核的唯一方法。
    • 它仍然没有退出。 :(
    【解决方案2】:

    这是我最后使用的代码(根据前面说明的要求):

    from multiprocessing import JoinableQueue
    from multiprocessing import Process
    
    def compute_single_score(data):
        #do lots of calculations
        return 0.0
    
    def producer(out_q, data_to_compute):
        while stuff:
            data = data_to_compute.popitem()
            out_q.put(data)
    
    def consumer(in_q, out_q):
        while True:
            try:
                data = in_q.get(timeout=5)
            except:
                break
            out_q.put([data[0], compute_single_score(*data)])
            in_q.task_done()
    
    def writer(in_q):
        while True:
            try:
                data = in_q.get(timeout=5)
            except:
                break
            #write
            in_q.task_done()
    
    if __name__ == '__main__':
        jobs_queue = JoinableQueue()
        scores_queue = JoinableQueue()
    
        processes = []
        processes.append(Process(target=producer, args=(jobs_queue, data_to_compute,)))
        processes.append(Process(target=writer, args=(scores_queue,)))
        for i in range(10):
            processes.append(Process(target=consumer, args=(jobs_queue,scores_queue,)))
    
        for p in processes:
            p.start()
    
        processes[1].join()
        scores_queue.join()
    
        print('File written')
    

    希望对其他人有所帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-12-10
      • 1970-01-01
      相关资源
      最近更新 更多