【问题标题】:Python multiprocessing with an updating queue and an output queue具有更新队列和输出队列的 Python 多处理
【发布时间】:2014-02-06 17:15:29
【问题描述】:

如何编写一个使用两个队列作为这些队列的 Python 多进程?:

  1. 一个作为工作队列,从一些数据开始,根据要并行化的函数的条件,动态接收更多任务,
  2. 另一个收集结果并用于在处理完成后记下结果。

我基本上需要根据我在其初始项目中找到的内容将更多任务放入工作队列中。我在下面发布的示例很愚蠢(我可以随意转换项目并将其直接放入输出队列),但它的机制很清楚,反映了我需要开发的部分概念。

这是我的尝试:

import multiprocessing as mp

def worker(working_queue, output_queue):
    item = working_queue.get() #I take an item from the working queue
    if item % 2 == 0:
        output_queue.put(item**2) # If I like it, I do something with it and conserve the result.
    else:
        working_queue.put(item+1) # If there is something missing, I do something with it and leave the result in the working queue 

if __name__ == '__main__':
    static_input = range(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] #I am running as many processes as CPU my machine has (is this wise?).
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    for result in iter(output_q.get, None):
        print result #alternatively, I would like to (c)pickle.dump this, but I am not sure if it is possible.

这不会结束,也不会打印任何结果。

在整个过程结束时,我想确保工作队列是空的,并且所有并行函数在后面的迭代取出结果之前已经完成了对输出队列的写入。您对如何使其发挥作用有什么建议吗?

【问题讨论】:

    标签: python multiprocessing


    【解决方案1】:

    下面的代码达到了预期的效果。它遵循@tawmas 提出的建议。

    此代码允许在一个进程中使用多个核心,该进程要求向工作人员提供数据的队列可以在处理期间由他们更新:

    import multiprocessing as mp
    def worker(working_queue, output_queue):
        while True:
            if working_queue.empty() == True:
                break #this is the so-called 'poison pill'    
            else:
                picked = working_queue.get()
                if picked % 2 == 0: 
                        output_queue.put(picked)
                else:
                    working_queue.put(picked+1)
        return
    
    if __name__ == '__main__':
        static_input = xrange(100)    
        working_q = mp.Queue()
        output_q = mp.Queue()
        results_bank = []
        for i in static_input:
            working_q.put(i)
        processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
        for proc in processes:
            proc.start()
        for proc in processes:
            proc.join()
        results_bank = []
        while True:
           if output_q.empty() == True:
               break
           results_bank.append(output_q.get_nowait())
        print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
        results_bank.sort()
        print results_bank
    

    【讨论】:

    • 一旦结果队列为空,您的打印循环就会一直等待。您应该使用 get_nowait 并显式捕获 Empty 异常以彻底退出。
    • 再次感谢您的帮助。我正在做 try: \n print result \n except Empty: \n break \n 这是打印预期的总结果,但控制台输出仍在抱怨异常。我认为我没有妥善处理它。
    • 你需要从你的try里面的队列中获取。
    • 亲爱的@tawmas,您的帮助非常有用。再次感谢。我无法指定异常。相反,我使用了一个 while True 循环,并检查队列何时为空。这应该是可靠的,因为只有一个 cpu-core 可以完成这样的任务。我使用您帮助找到的解决方案编辑了上面答案中的代码。
    【解决方案2】:

    您在创建流程的行中有错字。它应该是mp.Process,而不是mp.process。这就是导致您出现异常的原因。

    此外,您并没有在您的工作人员中循环,因此他们实际上只消耗队列中的单个项目然后退出。在不了解所需逻辑的情况下,很难给出具体建议,但您可能希望将 worker 函数的主体包含在 while True 循环中,并在主体中添加条件以在工作完成时退出完成。

    请注意,如果您不添加明确退出循环的条件,则当队列为空时,您的工作人员将永远停止。您可以考虑使用所谓的毒丸技术来向工人发出他们可能退出的信号。您可以在 Communication Between processes 上的 PyMOTW 文章中找到一个示例和一些有用的讨论。

    至于要使用的进程数量,您需要进行一些基准测试以找到适合您的方法,但一般来说,当您的工作负载受 CPU 限制时,每个内核一个进程是一个很好的起点。如果您的工作负载受 IO 限制,则使用更多工作人员可能会获得更好的结果。

    【讨论】:

    • 不客气!请不要在您这样做时,我编辑了我的回复以开始解决您的问题的其余部分。
    • 我刚刚阅读了您回答的剩余部分。我将尝试应用一个 while True 循环。我想知道如果队列中没有更多项目可以处理,该过程是否完成。我想使用队列长度之类的东西,但文档指出这不可靠。
    • 为您提供更多详细信息!
    • 我试图将所谓的“毒丸”放在 JoinableQueue 的初始任务的末尾。但是随着这个队列在多处理中获得更多任务,药丸在一些必须处理的任务之前获得。我是否应该更好地检查“锁定”,即使这会影响性能?
    • 确实,我的员工正在排队等待更多任务,具体取决于他们在工作中发现的内容。我试图实施你关于让每个工人在没有更多工作时发布毒丸的建议。我只是使用了不可靠的 .empty() 方法;在最坏的情况下,我最终会得到一个核心来处理其他工作人员没有意识到的待处理任务。我打印了我的结果,但控制台中的“In”行不再出现。你知道为什么吗?我更新了以下分析器中的代码并将您的代码设置为正确答案。非常感谢您的帮助。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2010-10-29
    • 1970-01-01
    • 2012-02-23
    • 2016-04-18
    • 2018-06-30
    • 2012-07-11
    相关资源
    最近更新 更多