【问题标题】:Group elements in Apache Beam pipelineApache Beam 管道中的组元素
【发布时间】:2018-01-05 15:20:07
【问题描述】:

我有一个管道可以解析来自 AVRO 文件的记录。

我需要将传入的记录拆分为 500 个项目的块,以便调用同时接受多个输入的 API。

有没有办法用 Python SDK 做到这一点?

【问题讨论】:

    标签: google-cloud-dataflow apache-beam apache-beam-io


    【解决方案1】:

    我假设您的意思是批处理用例。你有几个选择:

    如果您的 PCollection 足够大,并且您对捆绑包的大小有一定的灵活性,则可以在以随机/循环顺序为您的元素分配键后使用 GroupByKey 转换。例如:

    my_collection = p | ReadRecordsFromAvro()
    
    element_bundles = (my_collection 
                         # Choose a number of keys that works for you (I chose 50 here)
                       | 'AddKeys' >> beam.Map(lambda x: (randint(0, 50), x))
                       | 'MakeBundles' >> beam.GroupByKey()
                       | 'DropKeys' >> beam.Map(lambda (k, bundle): bundle)
                       | beam.ParDo(ProcessBundlesDoFn()))
    

    ProcessBundlesDoFn 是这样的:

    class ProcessBundlesDoFn(beam.DoFn):
      def process(self, bundle):
        while bundle.has_next():
          # Fetch in batches of 500 until you're done
          result = fetch_n_elements(bundle, 500)
          yield result
    

    如果您需要拥有正好 500 个元素的所有捆绑包,那么您可能需要:

    1. 计算 PCollection 中元素的数量
    2. 将该计数作为单例侧输入传递给您的 'AddKeys' ParDo,以确定您需要的确切密钥数量。

    希望对您有所帮助。

    【讨论】:

    • 感谢 Pablo,这就是我最终所做的。不幸的是,我想尽可能地并行化,并且我可以分配负载的“桶”或随机键的数量很难预先计算。将侧面输入作为单例传递的文档非常稀少。谢谢!
    • 另外,请注意,您选择的密钥数量将决定管道中的并行度,因为每个密钥都是按顺序处理的。例如,如果有 50 个键,您的作业将无法在超过 50 台机器上运行(实际上,最终工作人员的数量远小于键的数量)。
    • 另外,如果您需要,我会详细说明侧边输入法。
    • 谢谢巴勃罗!我已经设法在 DAG 构建时预先计算项目数。
    猜你喜欢
    • 2018-11-05
    • 2021-11-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多