【发布时间】:2019-11-18 12:30:46
【问题描述】:
我有一个设备需要多处理来处理 CPU 绑定的传入数据的反序列化和解码;但是应用程序的其余部分是较慢的 IO 限制代码,这对于 asyncio 非常有用。但是,似乎没有将多处理和异步结合在一起的好方法。
我尝试过https://github.com/dano/aioprocessing,它使用线程执行器进行多处理操作。但是,这个库本身并不支持常见的 asyncio 操作;例如,取消使用此库在 queue.get 上等待的协程将导致死锁。
我也尝试过使用ProcessPoolExecutor,但将多处理对象传递给此执行程序不起作用,因为在创建进程时未传递队列对象。
import multiprocessing
import asyncio
import atexit
from concurrent.futures import ProcessPoolExecutor
@atexit.register
def kill_children():
[p.kill() for p in multiprocessing.active_children()]
async def queue_get(queue: multiprocessing.Queue):
executor = ProcessPoolExecutor(max_workers=1)
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, queue.get)
async def main():
queue = multiprocessing.Queue()
get_task = asyncio.create_task(queue_get(queue))
queue.put(None)
print(await get_task)
if __name__ == "__main__":
asyncio.run(main())
运行此代码会导致此异常:
RuntimeError: Queue objects should only be shared between processes through inheritance
有什么方法可以清晰地弥合多处理和异步之间的差距吗?
【问题讨论】:
-
answers here 有帮助吗?可能是重复的
-
是的,确实如此,谢谢@new-dev-123。我错过了
multiprocessing.Manager()以使代码正常工作。
标签: python python-multiprocessing python-asyncio