【问题标题】:multiprocessing: pool: wait for all results, but process individual results immediately多处理:池:等待所有结果,但立即处理单个结果
【发布时间】:2015-02-23 13:05:58
【问题描述】:

我有一个任务列表,我想分配给池中的工作人员。我想实现两件事:

  1. 当一个工人完成后,立即处理结果
  2. 有一个简单的方法来等待所有工作人员完成。

使用 fapply_async,我可以轻松实现第一个目标。每当工作人员完成时,就会调用回调。然而,为了实现第二个目标,我能想出的唯一解决方案基本上只是轮询 AsyncResults,直到它们都准备好()。

使用map_async,我可以轻松实现第二个目标。但是,当所有工作人员都完成时,回调只会被调用一次。我相信我理解这样做的原因(结果的顺序是相关的)。

我是否缺少一些可以同时实现目标 1 和 2 的解决方案?

这是我的测试代码:

#!/usr/bin/python3

import multiprocessing
import time
import random

def worker(src):
    time.sleep(0.2)
    # src is apply_async or map_async
    return (src, random.randint(1, 100))

def map_async_example():
    tasks = ['map_async'] * 20
    with multiprocessing.Pool(processes=4) as pool:
        r = pool.map_async(worker, tasks, callback=print)
        r.wait()

def fapply_async_example():
    tasks = [('fapply_async',)] * 20
    with multiprocessing.Pool(processes=4) as pool:
        ars = []
        for t in tasks:
            ar = pool.apply_async(worker, t, callback=print)
            ars.append(ar)
        # Wait for all AsyncResults to become ready()
        while len(ars) > 0:
            time.sleep(0.5)
            # Keep only the not-ready results
            ars = [ar for ar in ars if not ar.ready()]

def main():
    # One list of 20 results
    print('===============')
    print('Using map_async')
    print('===============')
    map_async_example()

    # 20 results
    print('==================')
    print('Using fapply_async')
    print('==================')
    fapply_async_example()

if __name__ == '__main__':
    main()

【问题讨论】:

    标签: python


    【解决方案1】:

    也许我遗漏了一些东西,但为什么不先处理您的问题,然后在最后处理join() 他们呢?

    【讨论】:

    • 是的,做到了。谢谢!我误解了之前对 close() 的必要调用的效果,所以我没有尝试。
    猜你喜欢
    • 2021-01-21
    • 1970-01-01
    • 2020-07-24
    • 1970-01-01
    • 1970-01-01
    • 2012-01-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多