【问题标题】:Watch stdout and stderr of a subprocess simultaneously同时观察子进程的 stdout 和 stderr
【发布时间】:2018-06-17 23:13:24
【问题描述】:

如何同时查看长时间运行的子进程的标准输出和标准错误,并在子进程生成后立即处理每一行?

我不介意使用 Python3.6 的异步工具在两个流上创建我期望的非阻塞异步循环,但这似乎并不能解决问题。以下代码:

import asyncio
from asyncio.subprocess import PIPE
from datetime import datetime


async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for f in p.stdout:
        print(datetime.now(), f.decode().strip())
    async for f in p.stderr:
        print(datetime.now(), "E:", f.decode().strip())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run('''
         echo "Out 1";
         sleep 1;
         echo "Err 1" >&2;
         sleep 1;
         echo "Out 2"
    '''))
    loop.close()

输出:

2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:37.770187 Out 2
2018-06-18 00:06:37.770882 E: Err 1

虽然我希望它输出如下内容:

2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:36.770882 E: Err 1
2018-06-18 00:06:37.770187 Out 2

【问题讨论】:

    标签: python subprocess python-asyncio


    【解决方案1】:

    要实现这一点,您需要一个函数,该函数将采用两个异步序列并合并它们,在它们可用时从其中一个或另一个生成结果。有了这样的功能,run 可能看起来像这样:

    async def run(cmd):
        p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
        async for f in merge(p.stdout, p.stderr):
            print(datetime.now(), f.decode().strip())
    

    merge 这样的函数(还)不存在于标准库中,但aiostream 外部库provides one 中。您也可以使用异步生成器和asyncio.wait() 编写自己的:

    async def merge(*iterables):
        iter_next = {it.__aiter__(): None for it in iterables}
        while iter_next:
            for it, it_next in iter_next.items():
                if it_next is None:
                    fut = asyncio.ensure_future(it.__anext__())
                    fut._orig_iter = it
                    iter_next[it] = fut
            done, _ = await asyncio.wait(iter_next.values(),
                                         return_when=asyncio.FIRST_COMPLETED)
            for fut in done:
                iter_next[fut._orig_iter] = None
                try:
                    ret = fut.result()
                except StopAsyncIteration:
                    del iter_next[fut._orig_iter]
                    continue
                yield ret
    

    上面的run 仍然会在一个细节上与您想要的输出有所不同:它不会区分输出行和错误行。但这可以通过用指示器装饰线条来轻松实现:

    async def decorate_with(it, prefix):
        async for item in it:
            yield prefix, item
    
    async def run(cmd):
        p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
        async for is_out, line in merge(decorate_with(p.stdout, True),
                                        decorate_with(p.stderr, False)):
            if is_out:
                print(datetime.now(), line.decode().strip())
            else:
                print(datetime.now(), "E:", line.decode().strip())
    

    【讨论】:

    • 这样就解决了,谢谢!我没想到他们需要第三方库,我确信标准库中有工具可以做到这一点。另请注意,我必须将循环包装在 async with 语句中,否则我会收到警告:"AsyncIteratorContext is iterated outside of its context"
    • @AlephAleph 我面临着类似的问题,但我不确定我应该如何包装合并。请问能详细点吗?谢谢!
    【解决方案2】:

    我想到实际上有一个更简单的解决方案,至少如果观察代码不需要在单个协程调用中。

    你可以做的是生成两个独立的协程,一个用于 stdout,一个用于 stderr。并行运行它们将为您提供所需的语义,您可以使用 gather 等待它们完成:

    def watch(stream, prefix=''):
        async for line in stream:
            print(datetime.now(), prefix, line.decode().strip())
    
    async def run(cmd):
        p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
        await asyncio.gather(watch(p.stdout), watch(p.stderr, 'E:'))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-04-06
      • 1970-01-01
      • 1970-01-01
      • 2011-07-07
      • 2020-06-23
      • 1970-01-01
      • 2018-07-31
      • 2015-08-18
      相关资源
      最近更新 更多