【发布时间】:2014-02-06 17:15:29
【问题描述】:
如何编写一个使用两个队列作为这些队列的 Python 多进程?:
- 一个作为工作队列,从一些数据开始,根据要并行化的函数的条件,动态接收更多任务,
- 另一个收集结果并用于在处理完成后记下结果。
我基本上需要根据我在其初始项目中找到的内容将更多任务放入工作队列中。我在下面发布的示例很愚蠢(我可以随意转换项目并将其直接放入输出队列),但它的机制很清楚,反映了我需要开发的部分概念。
这是我的尝试:
import multiprocessing as mp
def worker(working_queue, output_queue):
item = working_queue.get() #I take an item from the working queue
if item % 2 == 0:
output_queue.put(item**2) # If I like it, I do something with it and conserve the result.
else:
working_queue.put(item+1) # If there is something missing, I do something with it and leave the result in the working queue
if __name__ == '__main__':
static_input = range(100)
working_q = mp.Queue()
output_q = mp.Queue()
for i in static_input:
working_q.put(i)
processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] #I am running as many processes as CPU my machine has (is this wise?).
for proc in processes:
proc.start()
for proc in processes:
proc.join()
for result in iter(output_q.get, None):
print result #alternatively, I would like to (c)pickle.dump this, but I am not sure if it is possible.
这不会结束,也不会打印任何结果。
在整个过程结束时,我想确保工作队列是空的,并且所有并行函数在后面的迭代取出结果之前已经完成了对输出队列的写入。您对如何使其发挥作用有什么建议吗?
【问题讨论】: