【问题标题】:Achieving shell-like pipeline performance in Python在 Python 中实现类似 shell 的管道性能
【发布时间】:2019-03-23 15:16:23
【问题描述】:

[编辑:首先阅读接受的答案。下面的长期调查源于计时测量中的一个微妙错误。]

我经常需要处理超大 (100GB+) 文本/类似 CSV 的文件,其中包含实际上无法在未压缩的磁盘上存储的高度冗余数据。我非常依赖像 lz4 和 zstd 这样的外部压缩器,它们产生的标准输出流接近 1GB/s。

因此,我非常关心 Unix shell 管道的性能。但是大型的 shell 脚本很难维护,所以我倾向于用 Python 构建管道,将命令拼接在一起并小心使用shlex.quote()

这个过程繁琐且容易出错,所以我想要一种“Pythonic”方式来达到同样的目的,在 Python 中管理 stdin/stdout 文件描述符而不卸载到/bin/sh。但是,我从来没有找到一种方法可以在不大大牺牲性能的情况下做到这一点。

Python 3 的文档推荐 replacing shell pipelinessubprocess.Popen 上的 communicate() 方法。我已经修改了这个例子来创建下面的测试脚本,它将 3GB 的/dev/zero 输送到一个无用的grep 中,它什么也不输出:

#!/usr/bin/env python3
from shlex import quote
from subprocess import Popen, PIPE
from time import perf_counter

BYTE_COUNT = 3_000_000_000
UNQUOTED_HEAD_CMD = ["head", "-c", str(BYTE_COUNT), "/dev/zero"]
UNQUOTED_GREP_CMD = ["grep", "Arbitrary string which will not be found."]

QUOTED_SHELL_PIPELINE = " | ".join(
    " ".join(quote(s) for s in cmd)
    for cmd in [UNQUOTED_HEAD_CMD, UNQUOTED_GREP_CMD]
)

perf_counter()
proc = Popen(QUOTED_SHELL_PIPELINE, shell=True)
proc.wait()
print(f"Time to run using shell pipeline: {perf_counter()} seconds")

perf_counter()
p1 = Popen(UNQUOTED_HEAD_CMD, stdout=PIPE)
p2 = Popen(UNQUOTED_GREP_CMD, stdin=p1.stdout, stdout=PIPE)
p1.stdout.close()
p2.communicate()
print(f"Time to run using subprocess.PIPE: {perf_counter()} seconds")

输出:

Time to run using shell pipeline: 2.412427189 seconds
Time to run using subprocess.PIPE: 4.862174164 seconds

subprocess.PIPE 方法的速度是/bin/sh 的两倍多。如果我们将输入大小提高到 90GB (BYTE_COUNT = 90_000_000_000),我们确认这不是恒定时间开销:

Time to run using shell pipeline: 88.796322932 seconds
Time to run using subprocess.PIPE: 183.734968687 seconds

到目前为止,我的假设是 subprocess.PIPE 只是连接文件描述符的高级抽象,并且数据永远不会复制到 Python 进程本身。正如预期的那样,在运行上述测试时,head 使用 100% 的 CPU,但 subproc_test.py 使用接近零的 CPU 和 RAM。

鉴于此,为什么我的管道如此缓慢?这是 Python subprocess 的内在限制吗?如果是这样,/bin/sh 在引擎盖下有何不同之处使其速度提高了一倍?

更一般地说,有没有更好的方法可以在 Python 中构建大型、高性能的子流程管道?

【问题讨论】:

  • 这种数据真的应该在本地处理吗?听起来它需要某种集群技术
  • 在大多数情况下,将数据上传到另一台服务器的时间远远大于我要替换的 shell 脚本的运行时间。我知道的任何 DBMS/Hadoop 类/“大数据”工具都需要更长的时间来摄取/ETL 数据,更不用说处理我的脚本了。我正在考虑的任务在今天的一台笔记本电脑上都是完全可行的,并且可以用 Bash 编写。我只是更喜欢 Python 控制流,并且如果可能的话,我想避免脱壳。
  • 说得对,我很好奇整个项目是否应该被吊到云端,这样基础设施就已经存在了:)
  • 顺便说一句,这里使用shell=True 是......不幸的。如果您的substring_which_will_never_be_found 包含$(rm -rf ~),或者 - 更糟糕的是 - $(rm -rf ~)'$(rm -rf ~)',那么您将度过一个非常糟糕的一天。 (依赖shlex.split() 也不是好的形式——如果你有一个带空格的名字,你想把它保留为一个名字;手动填充一个数组或元组,你不'无需担心您的内容会被篡改)。
  • ...进入主题——subprocess.PIPE 连接文件描述符的高级抽象; ,数据不会复制到 Python 进程的命名空间中。为什么你在这里看到不同是一个很好的问题——我需要深入研究;如果它与文件描述符上的缓冲设置有关,也不会感到惊讶。

标签: python bash shell subprocess pipe


【解决方案1】:

另外,考虑到这一点,Popen

在 3.3.1 版中更改:bufsize 现在默认为 -1 以启用 默认情况下缓冲以匹配大多数代码所期望的行为。在 Python 3.2.4 和 3.3.1 之前的版本错误地默认为 0 这是无缓冲的,允许短读。这是无意的 并且与大多数代码预期的 Python 2 的行为不匹配。

【讨论】:

    【解决方案2】:

    你的时机不对。您的perf_counter() 呼叫不会启动和停止计时器;他们只是从某个任意起点返回几秒钟。该起点可能恰好是这里的第一个 perf_counter() 调用,但它可能是任何点,甚至是未来的一个。

    subprocess.PIPE 方法所用的实际时间是 4.862174164 - 2.412427189 = 2.449746975 秒,而不是 4.862174164 秒。此时间未显示来自subprocess.PIPE 的可衡量的性能损失。

    【讨论】:

    • 哇,我很笨。谢谢。
    【解决方案3】:

    在python3中有“python方式”和“我们没有提到的方式”。 (虽然滥用 RAM 让我很痛苦,但现在似乎有相当多的可用内存。)

    #!/usr/bin/env python3
    # how you are "meant" to do it
    import subprocess
    ps = subprocess.Popen(('ip', 'a'), stdout=subprocess.PIPE)
    pt = subprocess.Popen(('grep', '192'), stdin=ps.stdout, stdout=subprocess.PIPE)
    pu = subprocess.Popen(('awk', '{print $2}'), stdin=pt.stdout, stdout=subprocess.PIPE)
    pv = subprocess.Popen(('sed', 's;/.*;;'), stdin=pu.stdout, stdout=subprocess.PIPE)
    #ps.wait()
    #ps.stdout.close()
    output = pv.communicate()[0]
    print(output.decode('utf-8').rstrip())
    
    # OR (the 1 we don't mention)
    import os
    print(os.popen('ip a|grep 192|awk \'{print $2}\'|sed \'s;/.*;;\'').read().rstrip())
    
    # or (the 1 we don't mention, pretending to be PEM compliant)
    cmd="ip a|grep 192|awk '{print $2}'|sed 's;/.*;;'"
    print(os.popen(cmd).read().rstrip())
    

    【讨论】:

    • 似乎一些 pythonistas 喜欢咬那些可能有用的信息而不解释他们的反对意见。
    • 我认为downvote(不是我)可能是因为您的回答在多个方面都很糟糕。首先,您没有回答OP的问题;第二,你不解释你在做什么;第三,您正在使用已弃用的函数 (os.popen),在 Python 3 中,无论如何都是使用 subprocess.Popen 实现的,如 stackoverflow.com/a/41678241/7738328 所示;第四,带有未解释的断言,即它以某种方式滥用 RAM。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-04-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-03-21
    相关资源
    最近更新 更多