【问题标题】:Cannot handle a Dataflow failed state after wait_to_finish()wait_to_finish() 后无法处理数据流失败状态
【发布时间】:2021-12-20 05:50:59
【问题描述】:

我有一个运行 Beam/Dataflow 作业的 Python 脚本

# add config
p = beam.Pipeline(options=pipeline_options)
# multiple dataflow processes . . .
# in some process I tryna raise the error to make dataflow job failed

result = p.run()
job_status = result.wait_until_finish()
if job_status == "FAILED":
   # run something else 

如上面的代码,我正在尝试处理 Dataflow 作业可能失败的情况,如果失败,将会有一个进程。但是在尝试了 Direct runner 和 Dataflow runner 之后。这项工作以我在函数中提出的异常结束。但如果工作成功,它可以处理job_status 函数,例如job_status == "DONE"

wait_until_finish() 将返回管道的最终状态。所以,我想我可以利用这个功能来处理失败的工作,但它不起作用。有什么想法吗?

Dataflow 控制台中显示的日志显示了我引发的异常及其结束,而在我的 IF 条件下没有运行任何东西

编辑: 我从Dataflow document 发现它wait_til_finish() 处理这样的不成功。不确定是否有其他方法可以做到这一点?

编辑: 我的工作实际上是从某个网站上抓取数据并将其推送到 GCS。失败的部分总是在抓取功能上,我以某种方式通过网站获得 403,我必须通过手动运行新的 Dataflow 作业来修复它。我只是想用它来处理自动开始新工作。

def scrape_data():
  # scrape data and return a json

def load_to_gcs():
  # dump json to gcs

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(sys.argv)

    # Parse pipeline paramerters
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    p = beam.Pipeline(options=pipeline_options)
    id_list = ['aa','bb','cc'] # input param for scrape_data()
    id = p | "init"  >> beam.Create(id_list)
    js = id | "scrape" >> beam.Map(scrape_data)
    js | "load to gcs" >> beam.Map(load_to_gcs)
    result = p.run()
    job_status = result.wait_until_finish()
    if job_status == "FAILED":
       # run something else >> call dataflow api to start new job

当它失败时(在抓取期间),数据流作业将自动重试 4 次,作业将失败。但是一旦失败,它就没有进入if条件。

【问题讨论】:

  • 能否分享您编写的代码,因为它有助于复制。
  • @SandeepMohanty 我添加了我正在处理的代码示例和用例

标签: python google-cloud-dataflow apache-beam


【解决方案1】:

此问题跟踪器中已提出此问题。我们目前无法提供预计到达时间,但您可以关注issue tracker 中的进度,您可以通过引用此Link 来“关注”问题以接收自动更新并给予关注。

【讨论】:

    【解决方案2】:

    如果您的管道失败,DataflowRunner 将引发错误(请参阅this code)。

    您可能希望以这种方式处理:

    try:
      result.wait_until_finish()
    except:
      # Ignore the exception that may be thrown.
      pass
    finally:
      if result.state == 'FAILED':
        # Whatever you need to do about this
        logging.error("Pipeline failed!")
    

    这是一个合理的选择吗?

    【讨论】:

      猜你喜欢
      • 2016-09-11
      • 1970-01-01
      • 1970-01-01
      • 2015-08-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多