【发布时间】:2019-05-03 10:15:30
【问题描述】:
我想在几个开始步骤之后执行几个步骤。 例如:在我的情况下,我想执行开始 3 个步骤,然后执行最后 2 个步骤。
一旦这 3 个步骤完成执行,那么我想开始最后 2 个步骤。
with beam.Pipeline(options=pipeline_options) as p1:
data_csv = p1 | 'Read CSV file' >> ReadFromText(known_args.input_csv_file)
dict1 = (data_csv | 'Format to json' >> (beam.ParDo(Split())))
(dict1 | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
known_args.output_stage_bq,
schema=product_revenue_schema
))
fullTable = (p1 | 'ReadFromBQ' >> beam.io.Read(beam.io.BigQuerySource(table_spec)))
(fullTable | 'writeToBQ another dataset' >> beam.io.WriteToBigQuery(known_args.output_target_bq,
schema = product_revenue_schema))
预期:1:Step1 -> step2 -> step3 -> step4 -> step5
实际:1:Step1 -> Step2 -> Step3 2:第四步->第五步
【问题讨论】:
标签: google-cloud-platform google-cloud-dataflow dataflow