【发布时间】:2018-12-07 16:19:30
【问题描述】:
我想知道数据流作业何时完成。
我尝试制作以下两个管道
1.
| 'write to bigquery' >> beam.io.WriteToBigQuery(...)
| WriteStringsToPubSub('projects/fakeprj/topics/a_topic')
2.
| 'write to bigquery' >> beam.io.WriteToBigQuery(...)
| 'DoPubSub' >> beam.ParDo(DoPubSub()) # do Publish using google.cloud.pubsub
但是上面两个代码都会产生以下错误:
AttributeError: 'PDone' 对象没有属性 'windowing'
WriteToBigquery 后如何处理?
注意:
我通过 REST 使用模板执行数据流。
所以,不能使用pipeline_result.wait_until_finish()。
编辑。
完整的堆栈在这里。
File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 327, in <module>
vital_data_export()
File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 323, in vital_data_export
result = p.run()
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 382, in run
return self.runner.run_pipeline(self)
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 285, in run_pipeline
return_context=True)
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 580, in to_runner_api
root_transform_id = context.transforms.get_id(self._root_transform())
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 810, in to_runner_api
for part in self.parts],
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 814, in to_runner_api
for tag, out in self.named_outputs().items()},
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 814, in <dictcomp>
for tag, out in self.named_outputs().items()},
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pvalue.py", line 144, in to_runner_api
self.windowing))
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pvalue.py", line 128, in windowing
self.producer.inputs)
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\transforms\ptransform.py", line 443, in get_windowing
return inputs[0].windowing
AttributeError: 'PDone' object has no attribute 'windowing'
【问题讨论】:
-
你能显示完整的回溯吗?
-
感谢您的评论。我添加了完整的堆栈。
-
错误显示为“AttributeError: 'PDone' object has no attribute 'windowing'”。我们可以看到属性名称是“PDone”,它应该有一个名为“windowing”的属性,但缺少。您能否告诉我“PDone”是如何被引入到您的管道中的,以及为什么它没有预期的“windowing”属性?
-
我没有处理“PDone”和“windowing”。WriteToBigQuery 将“PDone”发送到下一个管道。
-
您提到您没有在代码中使用“PDone”。但是,此错误意味着您的代码在某个地方正在使用它,因此,您可能需要共享您的代码,以便我们获得完整的上下文。如果您确定它与您的代码无关,我建议您在此处找到的“公共问题跟踪器”上使用“创建新的 Cloud Dataflow 问题”打开一个错误:cloud.google.com/support/docs/issue-trackers
标签: google-app-engine google-cloud-dataflow