【问题标题】:Can I use asyncio to read from and write to a multiprocessing.Pipe?我可以使用 asyncio 读取和写入 multiprocessing.Pipe 吗?
【发布时间】:2019-11-05 22:58:56
【问题描述】:

我需要在 Python 中的进程之间进行通信,并且在每个进程中使用 asyncio 进行并发网络 IO。

目前我在进程之间使用multiprocessing.Pipesendrecv 大量数据,但是我在asyncio 之外这样做,我相信我在cpu 上花费了大量时间IO_WAIT 因为它。

似乎asyncio 可以并且应该用于处理进程之间的管道 IO,但是除了管道 STDIN/STDOUT 之外我找不到任何示例。

从我读到的内容看来,我应该使用loop.connect_read_pipe(PROTOCOL_FACTORY, PIPE) 注册管道,同样也可以用于写入。但是我不明白protocol_factory 的用途,因为它与multiprocessing.Pipe 有关。甚至不清楚我是否应该创建 multiprocessing.Pipe 或者我是否可以在 asyncio 中创建管道。

【问题讨论】:

标签: python python-3.x multiprocessing pipe python-asyncio


【解决方案1】:

multiprocessing.Pipe 使用高级multiprocessing.Connection 模块来腌制和取消腌制Python 对象并在后台传输额外的字节。如果您想使用loop.connect_read_pipe() 从这些管道之一读取数据,则必须自己重新实现所有这些。

multiprocessing.Pipe 读取而不阻塞事件循环的最简单方法是使用loop.add_reader()。考虑以下示例:

import asyncio
import multiprocessing


def main():
    read, write = multiprocessing.Pipe(duplex=False)
    writer_process = multiprocessing.Process(target=writer, args=(write,))
    writer_process.start()
    asyncio.get_event_loop().run_until_complete(reader(read))


async def reader(read):
    data_available = asyncio.Event()
    asyncio.get_event_loop().add_reader(read.fileno(), data_available.set)

    if not read.poll():
        await data_available.wait()

    print(read.recv())
    data_available.clear()


def writer(write):
    write.send('Hello World')


if __name__ == '__main__':
    main()

使用较低级别的 os.pipe 创建的管道不会像来自 multiprocessing.Pipe 的管道那样添加任何额外内容。因此,我们可以将os.pipeloop.connect_read_pipe() 一起使用,而无需重新实现任何类型的内部工作。这是一个例子:

import asyncio
import multiprocessing
import os


def main():
    read, write = os.pipe()
    writer_process = multiprocessing.Process(target=writer, args=(write,))
    writer_process.start()
    asyncio.get_event_loop().run_until_complete(reader(read))


async def reader(read):
    pipe = os.fdopen(read, mode='r')

    loop = asyncio.get_event_loop()
    stream_reader = asyncio.StreamReader()
    def protocol_factory():
        return asyncio.StreamReaderProtocol(stream_reader)

    transport, _ = await loop.connect_read_pipe(protocol_factory, pipe)
    print(await stream_reader.readline())
    transport.close()


def writer(write):
    os.write(write, b'Hello World\n')


if __name__ == '__main__':
    main()

This code 帮助我弄清楚如何使用loop.connect_read_pipe

【讨论】:

    【解决方案2】:

    aiopipe 似乎在做你想做的事!它可以与内置的multiprocessing 模块一起使用,并提供与常规阻塞管道类似的API。

    【讨论】:

      猜你喜欢
      • 2016-04-23
      • 2014-06-17
      • 2016-10-19
      • 2020-04-18
      • 1970-01-01
      • 2012-05-22
      • 2019-01-04
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多