【发布时间】:2019-03-23 15:16:23
【问题描述】:
[编辑:首先阅读接受的答案。下面的长期调查源于计时测量中的一个微妙错误。]
我经常需要处理超大 (100GB+) 文本/类似 CSV 的文件,其中包含实际上无法在未压缩的磁盘上存储的高度冗余数据。我非常依赖像 lz4 和 zstd 这样的外部压缩器,它们产生的标准输出流接近 1GB/s。
因此,我非常关心 Unix shell 管道的性能。但是大型的 shell 脚本很难维护,所以我倾向于用 Python 构建管道,将命令拼接在一起并小心使用shlex.quote()。
这个过程繁琐且容易出错,所以我想要一种“Pythonic”方式来达到同样的目的,在 Python 中管理 stdin/stdout 文件描述符而不卸载到/bin/sh。但是,我从来没有找到一种方法可以在不大大牺牲性能的情况下做到这一点。
Python 3 的文档推荐 replacing shell pipelines 与 subprocess.Popen 上的 communicate() 方法。我已经修改了这个例子来创建下面的测试脚本,它将 3GB 的/dev/zero 输送到一个无用的grep 中,它什么也不输出:
#!/usr/bin/env python3
from shlex import quote
from subprocess import Popen, PIPE
from time import perf_counter
BYTE_COUNT = 3_000_000_000
UNQUOTED_HEAD_CMD = ["head", "-c", str(BYTE_COUNT), "/dev/zero"]
UNQUOTED_GREP_CMD = ["grep", "Arbitrary string which will not be found."]
QUOTED_SHELL_PIPELINE = " | ".join(
" ".join(quote(s) for s in cmd)
for cmd in [UNQUOTED_HEAD_CMD, UNQUOTED_GREP_CMD]
)
perf_counter()
proc = Popen(QUOTED_SHELL_PIPELINE, shell=True)
proc.wait()
print(f"Time to run using shell pipeline: {perf_counter()} seconds")
perf_counter()
p1 = Popen(UNQUOTED_HEAD_CMD, stdout=PIPE)
p2 = Popen(UNQUOTED_GREP_CMD, stdin=p1.stdout, stdout=PIPE)
p1.stdout.close()
p2.communicate()
print(f"Time to run using subprocess.PIPE: {perf_counter()} seconds")
输出:
Time to run using shell pipeline: 2.412427189 seconds
Time to run using subprocess.PIPE: 4.862174164 seconds
subprocess.PIPE 方法的速度是/bin/sh 的两倍多。如果我们将输入大小提高到 90GB (BYTE_COUNT = 90_000_000_000),我们确认这不是恒定时间开销:
Time to run using shell pipeline: 88.796322932 seconds
Time to run using subprocess.PIPE: 183.734968687 seconds
到目前为止,我的假设是 subprocess.PIPE 只是连接文件描述符的高级抽象,并且数据永远不会复制到 Python 进程本身。正如预期的那样,在运行上述测试时,head 使用 100% 的 CPU,但 subproc_test.py 使用接近零的 CPU 和 RAM。
鉴于此,为什么我的管道如此缓慢?这是 Python subprocess 的内在限制吗?如果是这样,/bin/sh 在引擎盖下有何不同之处使其速度提高了一倍?
更一般地说,有没有更好的方法可以在 Python 中构建大型、高性能的子流程管道?
【问题讨论】:
-
这种数据真的应该在本地处理吗?听起来它需要某种集群技术
-
在大多数情况下,将数据上传到另一台服务器的时间远远大于我要替换的 shell 脚本的运行时间。我知道的任何 DBMS/Hadoop 类/“大数据”工具都需要更长的时间来摄取/ETL 数据,更不用说处理我的脚本了。我正在考虑的任务在今天的一台笔记本电脑上都是完全可行的,并且可以用 Bash 编写。我只是更喜欢 Python 控制流,并且如果可能的话,我想避免脱壳。
-
说得对,我很好奇整个项目是否应该被吊到云端,这样基础设施就已经存在了:)
-
顺便说一句,这里使用
shell=True是......不幸的。如果您的substring_which_will_never_be_found包含$(rm -rf ~),或者 - 更糟糕的是 -$(rm -rf ~)'$(rm -rf ~)',那么您将度过一个非常糟糕的一天。 (依赖shlex.split()也不是好的形式——如果你有一个带空格的名字,你想把它保留为一个名字;手动填充一个数组或元组,你不'无需担心您的内容会被篡改)。 -
...进入主题——是,
subprocess.PIPE是连接文件描述符的高级抽象; 否,数据不会复制到 Python 进程的命名空间中。为什么你在这里看到不同是一个很好的问题——我需要深入研究;如果它与文件描述符上的缓冲设置有关,也不会感到惊讶。
标签: python bash shell subprocess pipe