【问题标题】:How to implement a single queue with multiple consumers如何实现具有多个消费者的单个队列
【发布时间】:2022-01-14 15:34:01
【问题描述】:

我有一个列表list_of_params 和一个函数run_train(),它接收来自list_of_params 的项目(例如run_train(list_of_params[0]))。我可以一次将run_train() 发送到多个 GPU。所以我想知道是否有任何可以并行化的单个队列的实现。

如果这还不够清楚,请想象以下场景:

“超市有一个顾客队列,但有5个收银员。一旦一个收银员空闲,它就会处理队列中下一个顾客的产品。这与每个收银员都有自己的线路相反。”

如果需要,我可以提供更多详细信息。

谢谢!

【问题讨论】:

    标签: python parallel-processing multiprocessing queue


    【解决方案1】:

    试试queue 模块。 '线程安全的多生产者、多消费者队列'。

    【讨论】:

    • 我不理解“线程安全的多生产者、多消费者队列”。这是一种设计模式吗?
    • 我的错,“线程安全的多生产者、多消费者队列”是指为您完成锁定和同步,并通过简单的 API(基本上是文档的第一段)公开。将它与生产者 - 消费者流程一起使用,或者您想要的任何方式。您可以使用任务数据预填充队列,然后让线程一一挑选任务。 IDK 您正在使用什么框架,以及它如何将任务分派到 GPU,也许它内部有某种队列,您可以使用。 glfh
    • 如果你使用多处理,那么你需要使用multiprocessing.Queue
    【解决方案2】:

    下面是一个使用带有几个生产者和消费者进程的队列的示例:

    from multiprocessing import Process, Queue, Event
    #if you use threading instead of multiprocessing, use queue.Queue rather than multiprocessing.Queue
    from queue import Empty
    from time import sleep
    from random import random
    
    def producer(stopflag, out_queue):
        while True:
            if stopflag.is_set():
                break
            sleep(random()) #sleep anywhere from 0-1 sec avg 0.5 (producer will produce on average a maximum of 2 outputs / sec)
            out_queue.put(random()) #may wait if the queue is currently full, thereby reducing production rate.
            
    def consumer(stopflag, in_queue):
        while True:
            if stopflag.is_set():
                break
            x = in_queue.get()
            sleep(x*2) #consumers work twice slower than producers averaging 1 item per sec
    
    def main():
        stopflag = Event()
        
        shared_q = Queue(maxsize=100) # if the queue fills up, it will force the producers
                                      #   to wait for the consumers to catch up. Otherwise, it
                                      #   may grow infinitely (until the computer runs out of memory)
        
        #2 producers
        producers = [Process(target=producer, args=(stopflag, shared_q)) for _ in range(2)]
        #4 consumers
        consumers = [Process(target=consumer, args=(stopflag, shared_q)) for _ in range(4)]
        
        for process in producers + consumers:
            process.start()
            
        sleep(20) #let them work a while
        
        stopflag.set() #tell them to stop
        
        for process in producers + consumers: #wait for them to stop
            process.join()
            
        #empty unfinished tasks from the queue so its thread can exit normally
        #(it's good practice to clean up resources kind of like closing files when done with them)
        try:
            while True:
                shared_q.get_nowait()
        except Empty:
            pass
        
    if __name__ == '__main__':
        main()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-13
      • 2011-10-23
      • 1970-01-01
      • 1970-01-01
      • 2012-07-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多