当我来到这个问题时,我期望答案是真正使用 asyncio 进行进程间通信。
我发现以下资源很有用:
https://github.com/python/asyncio/blob/master/examples/child_process.py
下面是我的简化示例(使用 3.5+ async/await 语法),它读取行并将它们排序输出:
import asyncio
from subprocess import Popen, PIPE
async def connect_write_pipe(file):
"""Return a write-only transport wrapping a writable pipe"""
loop = asyncio.get_event_loop()
transport, _ = await loop.connect_write_pipe(asyncio.Protocol, file)
return transport
async def connect_read_pipe(file):
"""Wrap a readable pipe in a stream"""
loop = asyncio.get_event_loop()
stream_reader = asyncio.StreamReader(loop=loop)
def factory():
return asyncio.StreamReaderProtocol(stream_reader)
transport, _ = await loop.connect_read_pipe(factory, file)
return stream_reader, transport
async def main(loop):
# start subprocess and wrap stdin, stdout, stderr
p = Popen(['/usr/bin/sort'], stdin=PIPE, stdout=PIPE, stderr=PIPE)
stdin = await connect_write_pipe(p.stdin)
stdout, stdout_transport = await connect_read_pipe(p.stdout)
stderr, stderr_transport = await connect_read_pipe(p.stderr)
# interact with subprocess
name = {stdout: 'OUT', stderr: 'ERR'}
registered = {
asyncio.Task(stderr.read()): stderr,
asyncio.Task(stdout.read()): stdout
}
to_sort = b"one\ntwo\nthree\n"
stdin.write(to_sort)
stdin.close() # this way we tell we do not have anything else
# get and print lines from stdout, stderr
timeout = None
while registered:
done, pending = await asyncio.wait(
registered, timeout=timeout,
return_when=asyncio.FIRST_COMPLETED)
if not done:
break
for f in done:
stream = registered.pop(f)
res = f.result()
if res != b'':
print(name[stream], res.decode('ascii').rstrip())
registered[asyncio.Task(stream.read())] = stream
timeout = 0.0
stdout_transport.close()
stderr_transport.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
NB:不采取特殊措施,写入管道的数据量是有限的。在我的系统中,可以在用完管道缓冲区之前写入超过 700000 个字节。
还有其他例子,使用create_subprocess_shell。
我还没有在实际项目中使用过 asyncio,所以欢迎在 cmets 中提出改进建议。