【问题标题】:streaming data into command with subprocess.Popen使用 subprocess.Popen 将数据流式传输到命令中
【发布时间】:2015-09-18 23:14:32
【问题描述】:

我经常需要对包含标题的文件集合进行排序。因为排序取决于标题的内容,所以这个用例比类似的问题更复杂(例如,Is there a way to ignore header lines in a UNIX sort?)。

我希望使用 Python 来读取文件,输出第一个文件的标题,然后将尾部通过管道传输到排序中。我已经尝试将此作为概念证明:

#!/usr/bin/env python

import io
import subprocess
import sys

header_printed = False

sorter = subprocess.Popen(['sort'], stdin=subprocess.PIPE)

for f in sys.argv[1:]:
    fd = io.open(f,'r')
    line = fd.readline()
    if not header_printed:
        print(line)
        header_printed = True
    sorter.communicate(line)

当以header-sort fileA fileB 调用时,fileA 和 fileB 包含类似的行

c   float   int
Y   0.557946     413
F   0.501935     852
F   0.768102     709

我明白了:

# sort file 1
Traceback (most recent call last):
  File "./archive/bin/pipetest", line 17, in <module>
    sorter.communicate(line)
  File "/usr/lib/python2.7/subprocess.py", line 785, in communicate
    self.stdin.write(input)
ValueError: I/O operation on closed file

问题是通信需要一个字符串,并且管道在写入后关闭。这意味着必须将内容完全读入内存。通信不需要生成器(我试过)。

一个更简单的演示是:

>>> import subprocess
>>> p = subprocess.Popen(['tr', 'a-z', 'A-Z'], stdin=subprocess.PIPE)
>>> p.communicate('hello')
HELLO(None, None)
>>> p.communicate('world')
Traceback (most recent call last):
  File "<ipython-input-14-d6873fd0f66a>", line 1, in <module>
    p.communicate('world')
  File "/usr/lib/python2.7/subprocess.py", line 785, in communicate
    self.stdin.write(input)
ValueError: I/O operation on closed file

那么,问题是,在 Python 中将数据流式传输到管道中的正确方法是什么(使用 Popen 或其他方式)?

【问题讨论】:

标签: python pipe subprocess


【解决方案1】:

对于您的特定情况,如果您只为单个标准句柄传递了subprocess.PIPE(在您的情况下为stdin),那么在您的示例中,您可以安全地一遍又一遍地调用sorter.stdin.write(line)。当你写完输出后,调用sorter.stdin.close(),这样sort就知道输入已经完成,它可以执行实际的排序和输出工作(sorter.communicate()不带参数可能也可以工作;否则,在关闭@987654332之后@ 你可能想打电话给sorter.wait() 让它完成)。

如果您需要处理多个管道标准手柄,正确的方法是 threading 为每个管道使用专用线程,必须处理第一个(概念上相对简单,但重量级)并引入了所有令人头疼的线程),或使用select 模块(或在Python 3.4+ 中,selectors 模块),这很难做到正确,但可以(在某些情况下)更有效。最后是creating temporary files for output,所以你可以在进程写入文件时直接写入进程的stdin(因此不会阻塞);然后,您可以在闲暇时读取文件(请注意,子进程在退出之前不一定会刷新它自己的输出缓冲区,因此在进一步的输入和输出填充并刷新之前,输出可能不会及时到达以响应您的输入缓冲区)。

subprocess.Popen.communicate() 方法使用线程或select 模块原语本身(取决于操作系统支持;实现在various _communicate methods here 下),只要您将subprocess.PIPE 传递给多个标准把手;这就是你必须这样做的方式。

【讨论】:

  • 只有一个管道(子进程的标准输入)。为什么这里需要多个线程?
  • 是的,我的错。我认为这是“我正在尝试在不使用 communicate 的情况下做 communicate 所做的事情”案例之一,并且回答过度。我已经编辑解释了它是如何仅使用一个 PIPE-ed 标准句柄来处理特定情况的。
【解决方案2】:

直接写入管道即可:

#!/usr/bin/env python2
import fileinput
import subprocess

process = subprocess.Popen(['sort'], stdin=subprocess.PIPE)
with process.stdin as pipe, fileinput.FileInput() as file:
    for line in file:
        if file.isfirstline(): # print header
            print line,
        else: # pipe tails
            pipe.write(line)
process.wait()

【讨论】:

  • 我开始了类似的路径(你的路径更优雅),但后来我在文档中看到了这个警告:“警告使用通信()而不是.stdin.write,.stdout.read或 .stderr.read 以避免由于任何其他操作系统管道缓冲区填满并阻塞子进程而导致的死锁。”当脚本写入子进程的标准输入并从其标准输出读取时,我了解死锁的可能性,但是当脚本和子进程都流式传输到标准输出时(如您的回答),我不了解死锁的可能性。评论?
  • @Reece:这里不适用。在一般情况下避免死锁的规则很简单:除非您使用相应的管道,否则不要使用 PIPE。
  • 这也是我的解释。感谢您的跟进。
  • 我将此答案用作为带有标题的文件提供多文件排序的工具的基础。标头由行数、前缀或正则表达式定义。输出标头已删除重复数据。自定义排序选项是允许的。也接受来自标准输入的数据。在这里:bitbucket.org/reece/reece-base/src/…
  • 结束聊天的结果:我们同意,如果可能,通常最好将管道外部化,但上面的管道确实会将标头排序到输出中。 -30-
【解决方案3】:

您可以使用来自stdinstdout 的写入/读取,但是根据您的子进程,您需要一个“刷新机制”让子进程处理您的输入。下面的代码适用于第一部分,但由于它关闭了stdin,它也终止了子进程。如果您使用flush() 更改它,或者如果您可以添加一些尾随字符来推送您的子进程,那么您可以使用它。否则,我建议您看看Multithreading in Python,尤其是pipes

p=subprocess.Popen(['tr','a-z','A-Z'],stdin=subprocess.PIPE,stdout=subprocess.PIPE)
p.stdin.write("hello\n")
p.stdin.close()
p.stdout.readline()
'HELLO\n'

【讨论】:

  • 使用flushclose 真的不安全;如果您将足够的数据发送到其自己的输出管道填充的子进程,它将阻塞。如果你填满它的输入管道,你就会阻塞。因为它在等待你阅读,而你也在等待它阅读,所以你陷入僵局,永远无法到达readline。此外,如果您使用flush 而不是close,则子进程可能会阻塞自己的输出,因此readline 可能会永远阻塞(并且您永远不会从readline 返回以发送比可能导致的更多数据它刷新它的缓冲区)。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2013-11-02
  • 1970-01-01
  • 2018-10-19
  • 1970-01-01
  • 2020-11-25
  • 1970-01-01
  • 2011-08-20
相关资源
最近更新 更多