【发布时间】:2021-12-20 05:50:59
【问题描述】:
我有一个运行 Beam/Dataflow 作业的 Python 脚本
# add config
p = beam.Pipeline(options=pipeline_options)
# multiple dataflow processes . . .
# in some process I tryna raise the error to make dataflow job failed
result = p.run()
job_status = result.wait_until_finish()
if job_status == "FAILED":
# run something else
如上面的代码,我正在尝试处理 Dataflow 作业可能失败的情况,如果失败,将会有一个进程。但是在尝试了 Direct runner 和 Dataflow runner 之后。这项工作以我在函数中提出的异常结束。但如果工作成功,它可以处理job_status 函数,例如job_status == "DONE"
wait_until_finish() 将返回管道的最终状态。所以,我想我可以利用这个功能来处理失败的工作,但它不起作用。有什么想法吗?
Dataflow 控制台中显示的日志显示了我引发的异常及其结束,而在我的 IF 条件下没有运行任何东西
编辑:
我从Dataflow document 发现它wait_til_finish() 处理这样的不成功。不确定是否有其他方法可以做到这一点?
编辑: 我的工作实际上是从某个网站上抓取数据并将其推送到 GCS。失败的部分总是在抓取功能上,我以某种方式通过网站获得 403,我必须通过手动运行新的 Dataflow 作业来修复它。我只是想用它来处理自动开始新工作。
def scrape_data():
# scrape data and return a json
def load_to_gcs():
# dump json to gcs
if __name__ == "__main__":
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(sys.argv)
# Parse pipeline paramerters
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
id_list = ['aa','bb','cc'] # input param for scrape_data()
id = p | "init" >> beam.Create(id_list)
js = id | "scrape" >> beam.Map(scrape_data)
js | "load to gcs" >> beam.Map(load_to_gcs)
result = p.run()
job_status = result.wait_until_finish()
if job_status == "FAILED":
# run something else >> call dataflow api to start new job
当它失败时(在抓取期间),数据流作业将自动重试 4 次,作业将失败。但是一旦失败,它就没有进入if条件。
【问题讨论】:
-
能否分享您编写的代码,因为它有助于复制。
-
@SandeepMohanty 我添加了我正在处理的代码示例和用例
标签: python google-cloud-dataflow apache-beam