【问题标题】:Terminate executor using ThreadPoolExecutor from concurrent.futures module使用 concurrent.futures 模块中的 ThreadPoolExecutor 终止执行程序
【发布时间】:2021-10-14 02:06:48
【问题描述】:

我正在尝试根据从长时间运行的请求返回的值终止 ThreadPool。一旦请求返回值的总和达到MIN_REQUIRED_VALUE,我希望终止线程池

我确信问题在于我正在创建一个完整的期货列表,而这些期货总是必须得到解决。我不确定如何在不使用 ThreadPoolExecutor 创建列表的情况下执行请求

我知道有几个与终止线程池相关的问题。我发现了类似的问题,但答案似乎无法处理返回值。

类似问题:

如果有更好的方法可以用另一个模块做到这一点,那很好。

任何帮助将不胜感激。

from time import sleep
from concurrent.futures import ThreadPoolExecutor, as_completed

NUM_REQUESTS = 50
MIN_REQUIRED_VALUE = 30


def long_request(id):
    sleep(3)
    return {"data": {"value": 10}}


def check_results(results):
    total = 0
    for result in results:
        total += result["data"]["value"]

    return total


def main():
    futures = []
    responses = []

    with ThreadPoolExecutor(max_workers=10) as executor:
        for request_index in range(NUM_REQUESTS):
            future = executor.submit(long_request, request_index)

            # Create Futures List
            futures.append(future)

        for future in as_completed(futures):
            responses.append(future.result())

            # Check minimum value reached
            total = check_results(responses)
            if total > MIN_REQUIRED_VALUE:
                executor.shutdown(wait=False)


if __name__ == "__main__":
    main()

【问题讨论】:

    标签: python multithreading concurrency threadpool python-multithreading


    【解决方案1】:

    我更改了代码以仅在未达到 MIN_REQUIRED_VALUE 时附加带有结果的期货,并遍历所有未决期货并在达到 MIN_REQUIRED_VALUE 时取消它们。

    您可以注意到我添加了 num_requests 来检查提交的请求数,在这种情况下结果正好是 6,这是预期的。

    如果有人有更好的方法来做到这一点会很高兴。

    from concurrent.futures import ThreadPoolExecutor, as_completed
    from time import sleep
    
    NUM_REQUESTS = 1000
    MIN_REQUIRED_VALUE = 50
    
    
    def long_request(id):
        sleep(1)
        return {"data": {"value": 10}}
    
    
    def check_results(results):
        total = 0
        for result in results:
            total += result["data"]["value"]
    
        return total
    
    
    def main():
        futures = []
        responses = []
        num_requests = 0
    
        with ThreadPoolExecutor(max_workers=10) as executor:
            for request_index in range(NUM_REQUESTS):
                future = executor.submit(long_request, request_index)
    
                # Future list
                futures.append(future)
    
            for future in as_completed(futures):
    
                # --- Changed Logic Below ---
                total = check_results(responses)
    
                if total > MIN_REQUIRED_VALUE:
                    for pending_future in futures:
                        pending_future.cancel()
                else:
                    num_requests += 1
                    responses.append(future.result())
    
        return num_requests
    
    
    if __name__ == "__main__":
        requests = main()
        print("Num Requests: ", requests)
    
    

    【讨论】:

    • @gold_cy 你是对的, executor.shutdown(wait=False) 现在工作正常。我一开始就尝试过并放弃了它,因为它不符合我的第一个逻辑。我用第二个逻辑尝试了它,它有效。它必须做类似于循环不完整的期货并调用取消的事情
    猜你喜欢
    • 2022-01-10
    • 2018-05-09
    • 1970-01-01
    • 1970-01-01
    • 2020-10-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多