【问题标题】:How to run a blocking task asynchronously with ProcessPoolExecutor and asyncio?如何使用 ProcessPoolExecutor 和 asyncio 异步运行阻塞任务?
【发布时间】:2022-12-21 00:37:47
【问题描述】:

我正在尝试使用 ProcessPoolExecutor 异步运行阻塞任务(它与 ThreadPoolExecutor 一起使用,但我需要 ProcessPoolExecutor 来执行 CPU 绑定任务)。这是我的代码:


import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
 
 
async def run_in_thread(task, *args):
    with ProcessPoolExecutor() as process_pool:
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(process_pool, task, *args)
        return result
        
async def main_task():
    while True:
        await asyncio.sleep(1)
        print("ticker")

async def main():
    asyncio.create_task(main_task())

    global blocking_task
    def blocking_task():
        time.sleep(5)
        print("blocking task done!")
    await run_in_thread(blocking_task)
 
 
if __name__ == "__main__":
    asyncio.run(main())

我得到这个错误:

result = await loop.run_in_executor(process_pool, task, *args)
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.        

我不明白问题出在哪里,有人可以帮助我吗? 我还想了解为什么它适用于 ThreadPoolExecutor 而不是 ProcessPoolExecutor

我期待代码打印:

ticker
ticker
ticker
ticker
ticker
blocking task done!

【问题讨论】:

    标签: python multithreading asynchronous multiprocessing python-asyncio


    【解决方案1】:

    所以看起来全局变量没有在进程之间共享,如果你看到它说的整个回溯

    AttributeError: Can't get attribute 'blocking_task'

    当您使用池生成新进程时,您使用 global blocking_task 设置(在主进程中)的全局变量不会反映在您生成的任何其他进程中。

    简单的解决方法是将函数 blocking_task 移到 main 之外,您的代码将起作用,无论何时产生新进程,它都会运行文件中的代码,这意味着它运行函数定义并将其添加到全局变量中子进程。

    在您当前的代码中,新进程不会运行调用 main 因为您的条件 __name__ == "__main__" 不再适用于子进程并且您手动添加到全局范围没有机会运行并且代码错误。

    线程不是这种情况,只有一个进程,线程可以共享全局变量,这就是它起作用的原因。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-12-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多