【问题标题】:How can I parallelize a pipeline of generators/iterators in Python?如何在 Python 中并行化生成器/迭代器的管道?
【发布时间】:2011-04-16 06:49:26
【问题描述】:

假设我有一些 Python 代码,如下所示:

input = open("input.txt")
x = (process_line(line) for line in input)
y = (process_item(item) for item in x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)

此代码从输入文件中读取每一行,通过几个函数运行它,然后将输出写入输出文件。现在知道函数process_lineprocess_itemgenerate_output_line永远不会相互干扰,假设输入和输出文件在不同的磁盘上,这样读取和书写不会互相干扰。

但 Python 可能对此一无所知。我的理解是Python会读取一行,依次应用每个函数,并将结果写入输出,然后它只会在将第一行发送到输出之后读取第二行,所以在第一条线退出之前,第二条线不会进入管道。我是否正确理解该程序将如何进行?如果这是它的工作原理,是否有任何简单的方法可以使多行可以同时在管道中,以便程序并行读取、写入和处理每个步骤?

【问题讨论】:

  • 我认为你需要为此添加多线程,所以我会说不 - 你在 CPython 和其他实现中没有多线程,它可能不值得拥有一百行而不是五。

标签: python iterator parallel-processing pipeline


【解决方案1】:

你不能真正并行读取或写入文件;最终,这些将成为您的瓶颈。您确定您的瓶颈是 CPU,而不是 I/O?

由于您的处理不包含依赖项(根据您的说法),因此使用 Python's multiprocessing.Pool class 非常简单。

有几种方法可以写这个,但更容易 w.r.t.调试是找到独立的关键路径(代码中最慢的部分),我们将使其并行运行。我们假设它是 process_item。

……实际上就是这样。代码:

import multiprocessing.Pool

p = multiprocessing.Pool() # use all available CPUs

input = open("input.txt")
x = (process_line(line) for line in input)
y = p.imap(process_item, x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)

我还没有测试过,但这是基本的想法。 Pool 的 imap 方法确保以正确的顺序返回结果。

【讨论】:

  • 我在具有 真正 快速 I/O 的服务器上运行它。 I/O 不是瓶颈也让我感到惊讶。此外,我可以同时读写,而且速度几乎没有减慢。有什么理由我不能或不应该在所有三个处理步骤中使用p.imap?我有 48 个内核可供使用,所以如果我能很好地并行化,我也许可以让 I/O 再次变得缓慢。
  • 48 核是……很多。如此之多以至于 Python 的多处理模块的锁定/信号量开销变得很大。也许您应该改用 Hadoop?
  • 要回答您的问题,这取决于您的代码。根据您给出的猜测,我可能会将所有 3 个函数合并为一个(取行并返回输出行)并将其传递给 imap。
  • 如果我不关心保留行的顺序,开销会减少吗?
  • 如果你不关心 order 和 process_item,p.imap_unordered 可能会稍微快一些(它会按完成的顺序产生结果,这样你就可以开始将这些行写入 output.txt早一点)
【解决方案2】:

有什么简单的方法可以让多行同时在管道中

我为此编写了一个库:https://github.com/michalc/threaded-buffered-pipeline,它在单独的线程中迭代每个可迭代对象。

那是什么

input = open("input.txt")

x = (process_line(line) for line in input)
y = (process_item(item) for item in x)
z = (generate_output_line(item) + "\n" for item in y)

output = open("output.txt", "w")
output.writelines(z)

变成

from threaded_buffered_pipeline import buffered_pipeline

input = open("input.txt")

buffer_iterable = buffered_pipeline()
x = buffer_iterable((process_line(line) for line in input))
y = buffer_iterable((process_item(item) for item in x))
z = buffer_iterable((generate_output_line(item) + "\n" for item in y))

output = open("output.txt", "w")
output.writelines(z)

这增加了多少实际并行度取决于每个迭代中实际发生的情况,以及您拥有多少 CPU 内核/它们有多忙。

典型的例子是 Python GIL:如果每个步骤都占用大量 CPU,并且只使用 Python,则不会添加太多并行性,这可能不会比串行版本快。另一方面,如果每个网络 IO 都很重,那么我认为它可能会更快。

【讨论】:

    猜你喜欢
    • 2011-09-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-01
    • 1970-01-01
    • 2018-05-16
    • 1970-01-01
    • 2015-11-07
    相关资源
    最近更新 更多