【问题标题】:Can I process pcollections in apache beam in chunks? Can I make batches of pcollection and process each batch separately?我可以分块处理 apache Beam 中的 pcollections 吗?我可以分批制作 pcollection 并分别处理每批吗?
【发布时间】:2021-04-07 03:57:27
【问题描述】:

我在 GCS 上有大约 2000 个文件,我想处理所有文件并将它们上传到 BigQuery。当我使用管道进行处理时,这些管道无法自行完成。它在处理 70% 的文件后失败或停止工作。 我决定每批制作 100 个文件。我使用BatchElements() 这在列表中创建了一个列表,即二维列表。 这些批次是否会单独执行。意味着第一批的 pcollections 将在第二批开始工作后自行执行(从 GCS 打开文件并将其上传到 BigQuery)。 这是以串行方式还是并行方式工作? 如果任何批次失败,我的工作会失败吗?

xml = (
        p1
        | "Get xml name" >> beam.Create(gz_names)
        | "Batch elements" >> BatchElements(100)
        | "Open file" >> beam.ParDo(ReadGCS())
        | "Create XML" >> beam.ParDo(CreateXML())
        | "Parse Json" >> beam.ParDo(JsonParser())
        | "Remove elements" >> beam.ParDo(Filtering())
        | "Build Json" >> beam.ParDo(JsonBuilder())
        | "Write elements" >> beam.io.WriteToText(file_path_prefix="output12", file_name_suffix=".json")
)

p1.run()

【问题讨论】:

    标签: python google-cloud-dataflow batch-processing apache-beam


    【解决方案1】:

    Beam 和其他大数据系统会尝试执行优化以改进管道的执行。最简单的优化叫做'fusion',也就是

    在这种情况下,问题在于您的 CreateBatchElementsReadGCS 转换都在同一个工作人员中执行,并且没有机会分配给多个工作人员。

    我的建议是让您尝试重新洗牌您的数据。这将使它能够被重新分配。像这样:

    xml = (
            p1
            | "Get xml name" >> beam.Create(gz_names)
            | beam.Reshuffle()  # This will redistribute your data to other workers
            | "Open file" >> beam.ParDo(ReadGCS())
            | "Create XML" >> beam.ParDo(CreateXML())
            | "Parse Json" >> beam.ParDo(JsonParser())
            | "Remove elements" >> beam.ParDo(Filtering())
            | "Build Json" >> beam.ParDo(JsonBuilder())
            | "Write elements" >> beam.io.WriteToText(file_path_prefix="output12", file_name_suffix=".json")
    )
    
    p1.run()
    

    一旦你这样做了,你的管道应该能够取得更好的进展。

    另一个可以提高管道性能的提示:考虑使用transforms under apache_beam.io.fileio。它们包括匹配文件名和读取文件的转换,它们在 Beam 中是标准的。

    如果您想深入了解一些跑步者优化,请查看FlumeJava paper : )

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-07-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-11-01
      • 2011-06-26
      • 2023-03-25
      • 1970-01-01
      相关资源
      最近更新 更多