【问题标题】:Filling a queue and managing multiprocessing in python在 python 中填充队列和管理多处理
【发布时间】:2013-06-18 23:08:09
【问题描述】:

我在 python 中遇到了这个问题:

  • 我有一个需要不时检查的 URL 队列
  • 如果队列已满,我需要处理队列中的每一项
  • 队列中的每个项目都必须由单个进程处理(多处理)

到目前为止,我设法像这样“手动”实现了这一目标:

while 1:
        self.updateQueue()

        while not self.mainUrlQueue.empty():
            domain = self.mainUrlQueue.get()

            # if we didn't launched any process yet, we need to do so
            if len(self.jobs) < maxprocess:
                self.startJob(domain)
                #time.sleep(1)
            else:
                # If we already have process started we need to clear the old process in our pool and start new ones
                jobdone = 0

                # We circle through each of the process, until we find one free ; only then leave the loop 
                while jobdone == 0:
                    for p in self.jobs :
                        #print "entering loop"
                        # if the process finished
                        if not p.is_alive() and jobdone == 0:
                            #print str(p.pid) + " job dead, starting new one"
                            self.jobs.remove(p)
                            self.startJob(domain)
                            jobdone = 1

但是,这会导致大量问题和错误。我想知道我是否更适合使用进程池。这样做的正确方法是什么?

但是,很多时候我的队列是空的,一秒钟可以填满 300 个项目,所以我不太清楚这里该怎么做。

【问题讨论】:

    标签: python queue multiprocessing pool


    【解决方案1】:

    您可以使用queue 的阻塞功能在启动时生成多个进程(使用multiprocessing.Pool)并让它们休眠,直到队列中有一些数据可供处理。如果您对此不熟悉,可以尝试“玩”这个简单的程序:

    import multiprocessing
    import os
    import time
    
    the_queue = multiprocessing.Queue()
    
    
    def worker_main(queue):
        print os.getpid(),"working"
        while True:
            item = queue.get(True)
            print os.getpid(), "got", item
            time.sleep(1) # simulate a "long" operation
    
    the_pool = multiprocessing.Pool(3, worker_main,(the_queue,))
    #                           don't forget the comma here  ^
    
    for i in range(5):
        the_queue.put("hello")
        the_queue.put("world")
    
    
    time.sleep(10)
    

    在 Linux 上使用 Python 2.7.3 测试

    这将产生 3 个进程(除了父进程)。每个孩子都执行worker_main 函数。这是一个简单的循环,在每次迭代时从队列中获取一个新项目。如果没有准备好处理,worker 将阻塞。

    在启动时,所有 3 个进程都将休眠,直到向队列提供一些数据。当数据可用时,等待的工作人员之一获得该项目并开始处理它。之后,它会尝试从队列中获取其他项目,如果没有可用则再次等待...

    【讨论】:

    • 这在 python 2.7.4 中的 windows 上不起作用,你需要有 if name = 'main' 部分,你应该将the_queue作为第三个参数传入multiprocessing.Pool函数,否则worker_main收不到数据
    • 我也对如何使这段代码工作感兴趣。当我按原样运行它时,它会运行,但它什么也不打印,可能是因为 worker_main 没有接收到数据。但是当我将 the_queue 作为第三个参数传递时,我得到了 TypeError: worker_main() argument after * must be a sequence, not Queue
    • @ziky90 你可能忘记了(queue,) 中的昏迷。我已编辑代码以添加注释,指出可能的错误来源。
    • 谢谢,这是一个问题,第二个是我直接从 Sublime Text2 运行它,由于某种原因它没有打印出进程的输出。当我从命令行运行代码时,它运行良好。
    • 无事可做时如何关闭工人?
    【解决方案2】:

    添加了一些代码(向队列提交“None”)以很好地关闭工作线程,并添加了关闭和加入 the_queue 和 the_pool 的代码:

    import multiprocessing
    import os
    import time
    
    NUM_PROCESSES = 20
    NUM_QUEUE_ITEMS = 20  # so really 40, because hello and world are processed separately
    
    
    def worker_main(queue):
        print(os.getpid(),"working")
        while True:
            item = queue.get(block=True) #block=True means make a blocking call to wait for items in queue
            if item is None:
                break
    
            print(os.getpid(), "got", item)
            time.sleep(1) # simulate a "long" operation
    
    
    def main():
        the_queue = multiprocessing.Queue()
        the_pool = multiprocessing.Pool(NUM_PROCESSES, worker_main,(the_queue,))
                
        for i in range(NUM_QUEUE_ITEMS):
            the_queue.put("hello")
            the_queue.put("world")
        
        for i in range(NUM_PROCESSES):
            the_queue.put(None)
    
        # prevent adding anything more to the queue and wait for queue to empty
        the_queue.close()
        the_queue.join_thread()
    
        # prevent adding anything more to the process pool and wait for all processes to finish
        the_pool.close()
        the_pool.join()
    
    if __name__ == '__main__':
        main()
    

    【讨论】:

    • 回答上述@pedrosaurio 问题:“为什么需要逗号?”逗号和括号将 the_queue 参数变成一个元组
    • 图解:&gt;&gt;&gt; type('blah') &lt;class 'str'&gt; &gt;&gt;&gt; type(('blah')) &lt;class 'str'&gt; &gt;&gt;&gt; type(('blah',)) &lt;class 'tuple'&gt;
    猜你喜欢
    • 1970-01-01
    • 2022-01-21
    • 1970-01-01
    • 2018-06-27
    • 2014-01-11
    • 2012-01-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多