【问题标题】:Peek into stream of Popen pipeline in Python在 Python 中窥视 Popen 管道流
【发布时间】:2012-10-07 18:25:22
【问题描述】:

背景:
Linux 上的 Python 2.6.6。 DNA 序列分析流程的第一部分。
我想从已安装的远程存储 (LAN) 中读取可能压缩过的文件,如果它是压缩过的; gunzip 将其压缩到一个流中(即使用gunzip FILENAME -c),如果流(文件)的第一个字符是“@”,则将整个流路由到一个过滤程序中,该程序接受标准输入的输入,否则只需将其直接通过管道传输到本地磁盘上的文件。我想尽量减少从远程存储读取/查找文件的次数(只通过文件一次应该不是不可能的吗?)。

示例输入文件的内容,前四行对应一条 FASTQ 格式的记录:

@I328_1_FC30MD2AAXX:8:1:1719:1113/1                                        
GTTATTATTATAATTTTTTACCGCATTTATCATTTCTTCTTTATTTTCATATTGATAATAAATATATGCAATTCG
+I328_1_FC30MD2AAXX:8:1:1719:1113/1                                        
hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhahhhhhhfShhhYhhQhh]hhhhffhU\UhYWc

不应通过管道传输到过滤程序的文件包含如下所示的记录(前两行对应于 FASTA 格式的一条记录):

>I328_1_FC30MD2AAXX:8:1:1719:1113/1
GTTATTATTATAATTTTTTACCGCATTTATCATTTCTTCTTTATTTTCATATTGATAATAAATATATGCAATTCG

有些人编造了半伪代码来可视化我想要做什么(我知道这不可能像我写的那样)。我希望它有一些意义:

if gzipped:
    gunzip = Popen(["gunzip", "-c", "remotestorage/file.gz"], stdout=PIPE)
    if gunzip.stdout.peek(1) == "@": # This isn't possible
        fastq = True
    else:
        fastq = False
if fastq:
    filter = Popen(["filter", "localstorage/outputfile.fastq"], stdin=gunzip.stdout).communicate()
else:
    # Send the gunzipped stream to another file

忽略这样一个事实,即代码不会像我在这里写的那样运行,而且我没有错误处理等,所有这些都已经在我的其他代码中了。我只是想要帮助窥视溪流或找到解决方法。如果你能gunzip.stdout.peek(1) 我会很棒,但我意识到这是不可能的。

到目前为止我所做的尝试:
我认为 subprocess.Popen 可能会帮助我实现这一点,并且我尝试了很多不同的想法,其中包括尝试使用某种 io.BufferedRandom() 对象来写入流,但我不知道是怎么做到的会工作。我知道流是不可搜索的,但也许一种解决方法可能是读取 gunzip-stream 的第一个字符,然后创建一个新流,您首先根据文件内容输入“@”或“>”,然后填充其余部分的 gunzip.stdout-stream 到新的流。然后将这个新流输入过滤器的 Popen 标准输入。

请注意,文件大小可能比可用内存大几倍。我不想从远程存储中对源文件执行一次以上的读取,也不想进行不必要的文件访问。

欢迎任何想法!如果我说得不够清楚,请向我提问。

【问题讨论】:

  • 使用gzip 模块而不是外部gzip 应该会给您更多的灵活性。
  • @PedroRomano 是的,它可能会。我担心我进行的文件访问次数。这将是在超级计算机系统上实现的管道的数据采集步骤的一部分,并且可能会同时在多个节点上运行,过多的文件系统调用可能会使远程文件服务器陷入困境。

标签: python popen fastq


【解决方案1】:

这是您的实现,首先根据文件内容输入“@”或“>”,然后将 gunzip.stdout-stream 的其余部分填充到新的流中提案。我只测试了测试的本地文件分支,但应该足以演示这个概念。

if gzipped:
    source = Popen(["gunzip", "-c", "remotestorage/file.gz"], stdout=PIPE)
else:
    source = Popen(["cat", "remotestorage/file"], stdout=PIPE)
firstchar = source.stdout.read(1)
# "unread" the char we've just read
source = Popen([r"(printf '\x%02x' && cat)" % ord(firstchar)],
               shell=True, stdin=source.stdout, stdout=PIPE)

# Now feed the output to a filter or to a local file.
flocal = None
try:
    if firstchar == "@":
        filter = Popen(["filter", "localstorage/outputfile.fastq"],
                       stdin=source.stdout)
    else:
        flocal = open('localstorage/outputfile.stream', 'w')
        filter = Popen(["cat"], stdin=source.stdout, stdout=flocal)
    filter.communicate()
finally:
    if flocal is not None:
        flocal.close()

这个想法是从源命令的输出中读取单个字符,然后使用(printf '\xhh' && cat) 重新创建原始输出,从而有效地实现 peek。替换流将shell=True 指定为Popen,将其留给shell 和cat 来完成繁重的工作。数据始终保留在管道中,永远不会完全读入内存。请注意,shell 的服务仅在对Popen 的单个调用中请求,该调用实现了不读取窥视字节,而不是对涉及用户提供的文件名的调用。即使在那个时候,字节也会被转义为十六进制,以确保 shell 在调用 printf 时不会破坏它。

可以进一步清理代码以实现名为 peek 的实际函数,该函数返回偷看的内容和替换 new_source

【讨论】:

  • 我想这是一种很老套的做法,但它应该可以正常工作。如果结果是最好的,我会尝试一些事情,然后再回到你的帖子。我通常不喜欢在命令行中使用用户输入运行真正的 shell(例如,用户可以设置输出文件)。他们总是设法把事情搞砸,除非你可能会让这个评论线程有点偏离主题并且有一个很好的方法来清理这些输入?
  • 感谢您的夸奖。 :) 说真的,虽然代码可能看起来像 hack,但这个解决方案在内存消耗方面效果很好,并且需要对现有代码进行最少的更改,这些代码已经调用了用于解压缩和过滤的外部命令。 “正确”的解决方案需要对代码进行认真的重新排列以使用 zlib 并手动将数据提供给过滤器,这可能会减慢处理速度。这表明提出此类解决方案的人没有提供工作代码。
  • 我现在更新了代码以避免shell=True 使用用户提供的输入。
  • 我选择了你的答案作为我的最爱,即使我可能不会按原样使用它。它肯定回答了我想要解决的确切问题,我对此表示赞赏。我目前正在研究一种解决方案,该解决方案涉及实现一个自定义 peek 方法(如您的),该方法启动一个线程,将标准输出馈送到下一个 Popen 进程的标准输入。我们会看看结果如何,如果没有,我肯定会回到你的!我喜欢它!
  • 这是一个有趣的方法。 printf ... && cat 命令几乎已经这样做了,但是在一个新的子进程中。请注意,cat 在将数据传输到管道时可能比用 Python 编写的任何循环都要快得多,所以我希望您的解决方案在大输入上速度较慢,但​​在小输入上速度更快(因为它不需要执行shcat)。如果你在 Unix 上工作,我的建议是简单地os.fork() 而不是开始一个新线程。管理起来更轻松,无需担心共享数据结构、死锁和 GIL。
【解决方案2】:

在 Python 中封装 shell 命令是没有意义的。你可以在 Python 中实现你所需要的一切,但无需付出任何代价:

  1. 打开输入文件并读取前 3 个字节。如果它们等于1F 8B 08,那么它应该是 gzip 文件。
  2. 重置文件标记
  3. 如果是 gzip 文件或读取文件,则将文件内容传递给 zlib.decompress()
  4. 如果需要,传递给过滤函数
  5. 将结果写入文件

编辑

这不起作用,因为在传递给 zlib 之前需要剥离 gzip 标头。但是,可以检查前 3 个字节,执行 fh.seek(0) 并将文件传递给 gzip.open() 如果您想确定文件是 gzip(使用 DEFLATE 压缩)。

将文件传递给 gzip 并捕获文件未压缩时抛出的异常可能更容易:

import gzip

try:
    in_file = gzip.open("infile")
    f_contents = in_file.read()
except IOError, e:
    # Re-raise exception if exception message is not "Not a gzipped file"
    # Perhaps it would be safer to check the header!
    if e.__str__() != "Not a gzipped file":
        raise
    in_file = open("infile")
    f_contents = in_file.read()

if f_contents[0] == "@":
    result = filter_function(f_contents)
else:
    result = f_contents

new_file = open("new_file", "w")
new_file.write(result)  

【讨论】:

  • 阅读魔法是检测文件类型的不好方法(特别是因为我怀疑这是正确的魔法)。我认为您应该改为使用 gzip 打开文件,如果失败则回退到常规读取。
  • @nneonneo 您的观点是,可以将文件传递给 gzip open 方法并捕获异常(或者只是默默地做正确的事情),但您对其他一切都是错误的。在对我的流程产生疑问之前,您为什么不做一些研究并找出 gzip 标头格式是什么?
  • gzip 魔法只有1F 8B;下一个值是一种可能随时间变化的压缩方法。 :)
  • 我想当你怀疑 1F 8B 08 是否正确时,你实际上只有 1/3 的怀疑是什么意思? ietf.org/rfc/rfc1952.txt 指出第 3 个字节是压缩方法,其中 08 表示“放气” - 即 zlib。我怀疑那里有很多库会支持带有非放气内容的 gzip :)
  • @nneonneo,在您以“gzip magic”开头的评论中,您在编辑之前声明下一个值是“版本号”,因此我指出了字节 3 的真正含义。
猜你喜欢
  • 1970-01-01
  • 2023-03-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-29
相关资源
最近更新 更多