【问题标题】:Handling rejects in Dataflow/Apache Beam through dependent pipelines通过依赖管道处理 Dataflow/Apache Beam 中的拒绝
【发布时间】:2021-01-05 21:25:30
【问题描述】:

我有一个从 BigQuery 获取数据并将其写入 GCS 的管道,但是,如果我发现任何拒绝,我想将它们正确地写入 Bigquery 表。我将拒绝收集到全局列表变量中,然后将列表加载到 BigQuery 表中。当我在本地运行它时,这个过程运行良好,因为管道以正确的顺序运行。当我使用dataflowrunner运行它时,它不能保证顺序(我希望pipeline1在pipeline2之前运行。有没有办法使用python在Dataflow中拥有依赖管道?或者也请建议是否可以用更好的方法解决这个问题。提前致谢。

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
 
    data = (pipeline1
               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
               | 'combine output to list' >> beam.combiners.ToList()
               | 'tranform' >> beam.Map(lambda x: somefunction)  # Collecting rejects in the except block of this method to a global list variable
               ....etc
               | 'to gcs' >> beam.io.WriteToText(output)
               )

# Loading the rejects gathered in the above pipeline to Biquery
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:
    rejects = (pipeline2
                    | 'create pipeline' >> beam.Create(reject_list)
                    | 'to json format' >> beam.Map(lambda data: {.....})
                    | 'to bq' >> beam.io.WriteToBigQuery(....)
                    )

【问题讨论】:

  • @R.Esteves 感谢您的回复。我确实尝试过使用它 - pipeline1.run().wait_until_finish()。它在使用 python 的数据流中不起作用
  • 您是否尝试使用您的第一个 pCollection 作为第二个管道的输入?
  • 你是在建议这样的事情吗?我收到 assert isinstance(pbegin, pvalue.PBegin) AssertionError with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1: data = (pipeline1 | 'get data' >> .... ) # Loading the rejects gathered in the above pipeline to Biquery with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2: rejects = (data | 'create pipeline' >> beam.Create(reject_list) | ..... )
  • 尝试将两个 PCollection 放在同一个管道中,如下所示:with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1: data = (pipeline1 | 'get data' >> ....) # 将上述管道中收集的拒绝加载到 Biquery rejects = (data | 'create pipeline' >> beam.Create(reject_list) | .....)

标签: python google-cloud-platform apache-beam dataflow google-dataflow


【解决方案1】:

您可以执行类似的操作,但只需 1 个管道,以及转换中的一些额外代码。

beam.Map(lambda x: somefunction) 应该有两个输出:一个写入 GCS,一个被拒绝的元素最终将写入 BigQuery。

为此,您的转换函数必须返回 TaggedOutput

Beam 编程指南中有一个示例:https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn

第二个PCollection,然后您可以写入 BigQuery。

您不需要在管道的第二个分支中使用 Create

管道将是这样的:

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
 
    data = (pipeline1
               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
               | 'combine output to list' >> beam.combiners.ToList()
               | 'tranform' >> beam.Map(transform)  # Tagged output produced here

    pcoll_to_gcs = data.gcs_output
    pcoll_to_bq  = data.rejected

    pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)
    pcoll_to_bq  | "to bq" >> beam.io.WriteToBigQuery(....)

那么transform 函数会是这样的

def transform(element):
  if something_is_wrong_with_element:
    yield pvalue.TaggedOutput('rejected', element)

  transformed_element = ....

  yield pvalue.TaggedOutput('gcs_output', transformed_element)

【讨论】:

    猜你喜欢
    • 2019-01-16
    • 2018-01-25
    • 2019-06-22
    • 2018-10-17
    • 2019-07-14
    • 2019-02-02
    • 1970-01-01
    • 2021-11-01
    • 2018-11-05
    相关资源
    最近更新 更多