【问题标题】:Concurrent futures wait for subset of tasks并发期货等待任务子集
【发布时间】:2018-11-12 03:28:20
【问题描述】:

我正在使用 Python 的 concurrent.futures 框架。我已经使用map() 函数来启动并发任务:

def func(i):
    return i*i

list = [1,2,3,4,5]
async_executor = concurrent.futures.ThreadPoolExecutor(5)
results = async_executor.map(func,list)

我只对第一个n 结果感兴趣,并希望在第一个n 线程完成后停止执行程序,其中n 是一个小于输入列表大小的数字。有没有办法在 Python 中做到这一点?我应该研究另一个框架吗?

【问题讨论】:

  • 解决方案是否必须能够终止那些在前 N 完成时已经启动的功能?还是说“在第一个 N 完成后不会开始新的工作,但是即使我们忽略它们的结果,已经开始的工作也将被允许完成”就足够了吗?如果您需要前者,请参阅:stackoverflow.com/questions/42782953/… - 无法直接使用您正在使用的库。
  • 您的第二个陈述似乎适合我的应用程序。我想我可以使用计数信号量,然后在信号量达到值“n”时取消执行程序。我的问题是,如果我这样做,“地图”的结果究竟是什么?它是否只包含前“n”个任务的结果?它会阻塞直到计算出已经开始但尚未完成的任务的结果?

标签: python python-3.x asynchronous threadpool concurrent.futures


【解决方案1】:

您不能为此使用map(),因为它无法停止等待结果,也无法获取提交的期货并取消它们。但是,您可以使用submit()

import concurrent.futures
import time

def func(i):
    time.sleep(i)
    return i*i


list = [1,2,3,6,6,6,90,100]
async_executor = concurrent.futures.ThreadPoolExecutor(2)
futures = {async_executor.submit(func, i): i for i in list}
for ii, future in enumerate(concurrent.futures.as_completed(futures)):
    print(ii, "result is", future.result())
    if ii == 2:
        async_executor.shutdown(wait=False)
        for victim in futures:
            victim.cancel()
        break

上面的代码运行大约需要 11 秒——它执行作业 [1,2,3,6,7] 而不是其余的。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-10-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-06-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多