【问题标题】:How to notify when DataFlow Job is completeDataFlow 作业完成时如何通知
【发布时间】:2018-12-07 16:19:30
【问题描述】:

我想知道数据流作业何时完成。

我尝试制作以下两个管道

1.

 | 'write to bigquery' >> beam.io.WriteToBigQuery(...)
 | WriteStringsToPubSub('projects/fakeprj/topics/a_topic')

2.

 | 'write to bigquery' >> beam.io.WriteToBigQuery(...)
 | 'DoPubSub' >> beam.ParDo(DoPubSub())   # do Publish using google.cloud.pubsub

但是上面两个代码都会产生以下错误:

AttributeError: 'PDone' 对象没有属性 'windowing'

WriteToBigquery 后如何处理?

注意: 我通过 REST 使用模板执行数据流。 所以,不能使用pipeline_result.wait_until_finish()

编辑。

完整的堆栈在这里。

File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 327, in <module>
   vital_data_export()
 File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 323, in vital_data_export
   result = p.run()
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 382, in run
   return self.runner.run_pipeline(self)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 285, in run_pipeline
   return_context=True)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 580, in to_runner_api
   root_transform_id = context.transforms.get_id(self._root_transform())
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 810, in to_runner_api
   for part in self.parts],
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 814, in to_runner_api
   for tag, out in self.named_outputs().items()},
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 814, in <dictcomp>
   for tag, out in self.named_outputs().items()},
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pvalue.py", line 144, in to_runner_api
   self.windowing))
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pvalue.py", line 128, in windowing
   self.producer.inputs)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\transforms\ptransform.py", line 443, in get_windowing
   return inputs[0].windowing
AttributeError: 'PDone' object has no attribute 'windowing'

【问题讨论】:

  • 你能显示完整的回溯吗?
  • 感谢您的评论。我添加了完整的堆栈。
  • 错误显示为“AttributeError: 'PDone' object has no attribute 'windowing'”。我们可以看到属性名称是“PDone”,它应该有一个名为“windowing”的属性,但缺少。您能否告诉我“PDone”是如何被引入到您的管道中的,以及为什么它没有预期的“windowing”属性?
  • 我没有处理“PDone”和“windowing”。WriteToBigQuery 将“PDone”发送到下一个管道。
  • 您提到您没有在代码中使用“PDone”。但是,此错误意味着您的代码在某个地方正在使用它,因此,您可能需要共享您的代码,以便我们获得完整的上下文。如果您确定它与您的代码无关,我建议您在此处找到的“公共问题跟踪器”上使用“创建新的 Cloud Dataflow 问题”打开一个错误:cloud.google.com/support/docs/issue-trackers

标签: google-app-engine google-cloud-dataflow


【解决方案1】:

在 java 中,这是我在数据流管道末尾向 PubSub 发布“完成”事件的操作,管道的输出正在写入 BigQuery。希望在 Python 中有等价物..

PCollection<TableRow> rows = data.apply("ConvertToTableRow", ParDo.of(new ConvertToRow()));
// Normally this would be the end of the pipeline..
WriteResult writeResult = rows.apply("WriteToBQ", BigQueryIO.writeTableRows().to(...);
// Transformations after this will be done AFTER all rows have been written to BQ
rows.apply(Wait.on(writeResult.getFailedInserts()))
    // Transforms each row inserted to an Integer of value 1
    .apply("OnePerInsertedRow", ParDo.of(new DoFn<TableRow, Integer>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output(Integer.valueOf(1));
        }
    }))
    // https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java#L51
    // Combines a PCollection of Integers (all 1's) by summing them. 
    // Outputs a PCollection of one integer element with the sum
    .apply("SumInsertedCounts", Sum.integersGlobally())
    .apply("CountsMessage", ParDo.of(new DoFn<Integer, PubsubMessage>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String messagePayload = "pipeline_completed";
            Map<String, String> attributes = new HashMap<>();
            attributes.put("rows_written", c.element().toString());
            PubsubMessage message = new PubsubMessage(messagePayload.getBytes(), attributes);
            c.output(message);
        }
    }))
    .apply("PublishCompletionMessage", PubsubIO.writeMessages().to(/* output topic */));

【讨论】:

  • 这是一个非常好的提示/提示。我对其进行了测试并且效果很好。赞一个。
  • writeResult.getFailedInterstsreturns 没有做成 BQ 的 TableRows。此代码是否仅在写入 BigQuery 失败时才有效?
  • 上述代码能否成功用于大查询插入?意思是它是否可用于在成功的 bq 插入上获得确认
【解决方案2】:

你不能

很明显,PDone 是流水线的最后阶段,不需要为此应用等待完成。

PInput 和 PDone 是 Apache Beam 支持的类,它们分别表示源和接收器。如果您在 BigQuery 写入后尝试执行某些操作,除非您连续运行两个不同的数据流作业,否则这是不可能的。

如果您正在寻找串联运行它们,请查看 Apache Airflow。

【讨论】:

猜你喜欢
  • 2016-11-26
  • 1970-01-01
  • 2019-09-06
  • 2021-09-03
  • 1970-01-01
  • 2021-03-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多