multiprocessing.Pipe 使用高级multiprocessing.Connection 模块来腌制和取消腌制Python 对象并在后台传输额外的字节。如果您想使用loop.connect_read_pipe() 从这些管道之一读取数据,则必须自己重新实现所有这些。
从multiprocessing.Pipe 读取而不阻塞事件循环的最简单方法是使用loop.add_reader()。考虑以下示例:
import asyncio
import multiprocessing
def main():
read, write = multiprocessing.Pipe(duplex=False)
writer_process = multiprocessing.Process(target=writer, args=(write,))
writer_process.start()
asyncio.get_event_loop().run_until_complete(reader(read))
async def reader(read):
data_available = asyncio.Event()
asyncio.get_event_loop().add_reader(read.fileno(), data_available.set)
if not read.poll():
await data_available.wait()
print(read.recv())
data_available.clear()
def writer(write):
write.send('Hello World')
if __name__ == '__main__':
main()
使用较低级别的 os.pipe 创建的管道不会像来自 multiprocessing.Pipe 的管道那样添加任何额外内容。因此,我们可以将os.pipe 与loop.connect_read_pipe() 一起使用,而无需重新实现任何类型的内部工作。这是一个例子:
import asyncio
import multiprocessing
import os
def main():
read, write = os.pipe()
writer_process = multiprocessing.Process(target=writer, args=(write,))
writer_process.start()
asyncio.get_event_loop().run_until_complete(reader(read))
async def reader(read):
pipe = os.fdopen(read, mode='r')
loop = asyncio.get_event_loop()
stream_reader = asyncio.StreamReader()
def protocol_factory():
return asyncio.StreamReaderProtocol(stream_reader)
transport, _ = await loop.connect_read_pipe(protocol_factory, pipe)
print(await stream_reader.readline())
transport.close()
def writer(write):
os.write(write, b'Hello World\n')
if __name__ == '__main__':
main()
This code 帮助我弄清楚如何使用loop.connect_read_pipe。