【问题标题】:How to combine python asyncio and multiprocessing?如何结合 python asyncio 和多​​处理?
【发布时间】:2019-11-18 12:30:46
【问题描述】:

我有一个设备需要多处理来处理 CPU 绑定的传入数据的反序列化和解码;但是应用程序的其余部分是较慢的 IO 限制代码,这对于 asyncio 非常有用。但是,似乎没有将多处理和异步结合在一起的好方法。

我尝试过https://github.com/dano/aioprocessing,它使用线程执行器进行多处理操作。但是,这个库本身并不支持常见的 asyncio 操作;例如,取消使用此库在 queue.get 上等待的协程将导致死锁。

我也尝试过使用ProcessPoolExecutor,但将多处理对象传递给此执行程序不起作用,因为在创建进程时未传递队列对象。

import multiprocessing
import asyncio
import atexit
from concurrent.futures import ProcessPoolExecutor


@atexit.register
def kill_children():
    [p.kill() for p in multiprocessing.active_children()]


async def queue_get(queue: multiprocessing.Queue):
    executor = ProcessPoolExecutor(max_workers=1)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, queue.get)


async def main():
    queue = multiprocessing.Queue()
    get_task = asyncio.create_task(queue_get(queue))

    queue.put(None)

    print(await get_task)


if __name__ == "__main__":
    asyncio.run(main())

运行此代码会导致此异常:

RuntimeError: Queue objects should only be shared between processes through inheritance

有什么方法可以清晰地弥合多处理和异步之间的差距吗?

【问题讨论】:

  • answers here 有帮助吗?可能是重复的
  • 是的,确实如此,谢谢@new-dev-123。我错过了 multiprocessing.Manager() 以使代码正常工作。

标签: python python-multiprocessing python-asyncio


【解决方案1】:

Can I somehow share an asynchronous queue with a subprocess?

上面的代码可以修改为通过multiprocessing.Manager()创建队列来处理多处理队列

import multiprocessing
import asyncio
import atexit
from concurrent.futures import ProcessPoolExecutor


@atexit.register
def kill_children():
    [p.kill() for p in multiprocessing.active_children()]


async def queue_get(queue: multiprocessing.Queue):
    executor = ProcessPoolExecutor(max_workers=1)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, queue.get)


async def main():
    manager = multiprocessing.Manager()
    queue = manager.Queue()
    get_task = asyncio.create_task(queue_get(queue))

    queue.put(None)
    print(await get_task)


if __name__ == "__main__":
    asyncio.run(main())

【讨论】:

    猜你喜欢
    • 2021-02-12
    • 1970-01-01
    • 2018-12-11
    • 1970-01-01
    • 2020-06-24
    • 1970-01-01
    • 1970-01-01
    • 2015-02-11
    • 2019-11-18
    相关资源
    最近更新 更多