【问题标题】:Apach Beam / Dataflow transform adviceApache Beam / Dataflow 转换建议
【发布时间】:2021-02-01 19:41:12
【问题描述】:

我有一个批处理数据解析作业,其中输入是一个 zip 文件列表,每个 zip 文件都有许多要解析的小文本文件。在 50 个 zip 文件中压缩 100Gb 的顺序,每个 zip 有 100 万个文本文件。

我在 Python 中使用 Apache Beam 的包并通过 Dataflow 运行作业。

我写成

  1. 从 zip 文件路径列表中创建集合
  2. FlatMap 具有为 zip 中的每个文本文件生成的函数(一个输出是从文本文件中读取的所有字节的字节字符串)
  3. ParDo 使用一种方法,该方法为文本文件中数据中的每一行/读取的字节数生成
  4. ...做其他事情,例如在某个数据库的相关表中插入每一行

我注意到这太慢了 - CPU 资源只使用了百分之几。我怀疑每个节点都在获取一个 zip 文件,但工作并未在本地 CPU 之间分配 - 所以每个节点只有一个 CPU 在工作。考虑到我使用的是 FlatMap,我不明白为什么会这样。

【问题讨论】:

    标签: google-cloud-dataflow apache-beam dataflow


    【解决方案1】:

    Dataflow 运行器使用Fusion optimisation:

    '...此类优化可以包括将管道执行图中的多个步骤或转换融合为单个步骤。'

    如果您有一个在其 DoFn 中具有较大扇出的变换,我怀疑您的描述中的 Create 变换确实如此,那么您可能希望通过在管道中引入一个 shuffle 阶段来手动中断融合,如链接文档。

    【讨论】:

    • 感谢您,学习 Beam!您如何看待使用第三方包来流式传输 zip 文件?这里的核心问题是 Beam 不支持分布式读取存储在云文件系统上的存档中的文件。但是,像 Dask 这样的工具可以从 GCS 流式传输压缩数据。但我不确定这有多大用处 - 我假设 Dataflow 不会为 Dask 提供工作人员资源......
    • 如何在每个节点本地解压一个zip文件,然后分发许多解压文件?这可能允许我们想要的宽图。
    • 如果我理解正确的话,这是一个两阶段的 DAG。第一阶段是解压缩 50 个 2 Gig 文件。每个输出 1M 个元素。如果我记得,您应该能够在第一次转换中使用带有 GSCutil 的 SeekableByteChannel 来提取元素。因此,创建一个 KV 并进行重新洗牌,这将分散解压缩工作。来自解压缩工作输出 KV 其中 bytes 是您的文本文件。使用 somehash 以免为每个文件创建唯一密钥,即 50m 个密钥。
    猜你喜欢
    • 2018-12-14
    • 1970-01-01
    • 1970-01-01
    • 2021-04-29
    • 1970-01-01
    • 2021-05-10
    • 1970-01-01
    • 2020-03-31
    • 2018-05-11
    相关资源
    最近更新 更多