【问题标题】:How can I feed a subprocess's standard input from a Python iterator?如何从 Python 迭代器提供子进程的标准输入?
【发布时间】:2011-10-17 02:00:36
【问题描述】:

我正在尝试使用 Python 中的 subprocess 模块与以流方式读取标准输入并写入标准输出的进程进行通信。我想让子进程从产生输入的迭代器中读取行,然后从子进程中读取输出行。输入和输出线之间可能没有一一对应的关系。如何从返回字符串的任意迭代器中提供子进程?

这里有一些示例代码给出了一个简单的测试用例,以及我尝试过的一些方法由于某种原因不起作用:

#!/usr/bin/python
from subprocess import *
# A really big iterator
input_iterator = ("hello %s\n" % x for x in xrange(100000000))

# I thought that stdin could be any iterable, but it actually wants a
# filehandle, so this fails with an error.
subproc = Popen("cat", stdin=input_iterator, stdout=PIPE)

# This works, but it first sends *all* the input at once, then returns
# *all* the output as a string, rather than giving me an iterator over
# the output. This uses up all my memory, because the input is several
# hundred million lines.
subproc = Popen("cat", stdin=PIPE, stdout=PIPE)
output, error = subproc.communicate("".join(input_iterator))
output_lines = output.split("\n")

那么当我从迭代器的标准输出中逐行读取时,如何让我的子进程逐行读取呢?

【问题讨论】:

  • 如何让脚本在后台运行?或者你只是不这样做? (我以为您来自 Q 的措辞“我正在尝试使用 Python 中的 subprocess 模块与读取标准输入并以流方式写入标准输出的进程进行通信。”

标签: python io subprocess


【解决方案1】:

简单的方法似乎是从子进程中分叉并提供输入句柄。任何人都可以详细说明这样做的任何可能的缺点吗?或者有没有 python 模块让它更容易和更安全?

#!/usr/bin/python
from subprocess import *
import os

def fork_and_input(input, handle):
    """Send input to handle in a child process."""
    # Make sure input is iterable before forking
    input = iter(input)
    if os.fork():
        # Parent
        handle.close()
    else:
        # Child
        try:
            handle.writelines(input)
            handle.close()
        # An IOError here means some *other* part of the program
        # crashed, so don't complain here.
        except IOError:
            pass
        os._exit()

# A really big iterator
input_iterator = ("hello %s\n" % x for x in xrange(100000000))

subproc = Popen("cat", stdin=PIPE, stdout=PIPE)
fork_and_input(input_iterator, subproc.stdin)

for line in subproc.stdout:
    print line,

【讨论】:

  • 如果您在子进程中使用exit(),则会引发SystemExit。应该改用os._exit(0)
  • use Thread() instead of os.fork() 用于可移植性并避免各种难以调试的问题。以下是os.fork() 可能出现问题的示例:Locks in the standard library should be sanitized on fork
  • 使用我正在运行的命令打开Popen 阻止我的整个脚本:(
  • @CharlieParker 我不知道如何帮助你。简单地运行Popen 不会阻塞任何东西,因为您还没有对子进程进行任何 I/O。
  • @RyanThompson 实际上,我发现在我的命令输出上调用 repr 是问题所在。我不再那样做了。谢谢!
【解决方案2】:

从 Python 迭代器提供子进程的标准输入:

#!/usr/bin/env python3 
from subprocess import Popen, PIPE

with Popen("sink", stdin=PIPE, bufsize=-1) as process:
    for chunk in input_iterator:
        process.stdin.write(chunk)

如果您想同时读取输出,则需要threads 或 async.io:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def writelines(writer, lines):
    # NOTE: can't use writer.writelines(lines) here because it tries to write
    # all at once
    with closing(writer):
        for line in lines:
            writer.write(line)
            await writer.drain()

async def main():
    input_iterator = (b"hello %d\n" % x for x in range(100000000))
    process = await asyncio.create_subprocess_exec("cat", stdin=PIPE, stdout=PIPE)
    asyncio.ensure_future(writelines(process.stdin, input_iterator))
    async for line in process.stdout:
        sys.stdout.buffer.write(line)
    return await process.wait()

if sys.platform == 'win32':
    loop = asyncio.ProactorEventLoop()  # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
with closing(loop):
    sys.exit(loop.run_until_complete(main()))

【讨论】:

    【解决方案3】:

    关注this recipe 这是一个支持异步 I/O 的子进程的附加组件。不过,这仍然需要您的子流程使用其输出的一部分来响应每个输入行或行组。

    【讨论】:

    • 我不能保证程序会为每一行输入产生输出。事实上,它可能不会。
    • 对不起,我说的不准确:我的意思是你的主进程应该能够为你的子进程提供足够的输入以生成一些输出,阅读这个输出,再给子进程提供一些输入,以此类推。如果是这种情况,我的链接指向的食谱可能会对您有所帮助。要点是您的子进程应该能够在看到所有输入之前开始生成输出。
    • 嗯。我的管道中可能有一个排序步骤(取决于选项),因此在收到所有输入之前它可能不会生成大部分输出。
    猜你喜欢
    • 2019-12-05
    • 1970-01-01
    • 1970-01-01
    • 2012-01-18
    • 2014-11-10
    相关资源
    最近更新 更多