【问题标题】:Is it possible to catch a missing dataset java.lang.RuntimeException in a Google Cloud Dataflow pipeline that writes from Pub/Sub to BigQuery?是否可以在从 Pub/Sub 写入 BigQuery 的 Google Cloud Dataflow 管道中捕获丢失的数据集 java.lang.RuntimeException?
【发布时间】:2020-03-03 22:49:22
【问题描述】:

我正在尝试处理我的 Dataflow 作业尝试动态写入 BigQuery 表目标的错误。

我想捕捉以下异常:

java.lang.RuntimeException:无法获取数据集的数据集 项目example_project中的example_dataset

为了创建数据集,然后重试写入 BigQuery。

是否可以以这种方式捕获异常,如果可以,您知道我需要在代码中的何处添加 try/catch 逻辑吗?

【问题讨论】:

    标签: java google-bigquery google-cloud-dataflow apache-beam google-cloud-pubsub


    【解决方案1】:

    您无法使用 try-catch 块处理这种情况,因为这是一个内部 BQ api 错误。相反,我建议您编写重试瞬态策略并设置错误类型。这样,您可以将 BigQuery 写入错误结果存储在 PCollection 中,然后根据需要转储该记录。请参考下面的 sn-p 来实现。

    WriteResult result = formattedData.get(successRows).setCoder(TableRowJsonCoder.of()).apply("BQ SteamingInserts",
                    BigQueryIO.writeTableRows().withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                            .to("audit.db_audit")
                            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()).withoutValidation()
                            .withExtendedErrorInfo());
    

    使用上面的代码sn-p,如果由于ddl ops而失败,数据将存储在WriteResult中。

    PCollection<String> failedInserts = result.getFailedInsertsWithErr().apply("BQErrorToTableRow",
                    ParDo.of(new BQErrorToString()));
    

    你可以使用上面的代码sn -p来获取失败的记录。让我知道这是否有帮助:)

    【讨论】:

    • 当模式不匹配时,我们使用这种方法来捕获失败的写入(我们使用 DynamicDestinations 动态创建表,并希望在这不能正常工作时捕获)。它运行良好,我们将getFailedInsertsWithErr() 的结果窗口化并将其写入 GCS 存储桶。
    【解决方案2】:

    不存在的 BigQuery 数据集和/或表将无限期重试,并可能导致管道卡住。 BigQueryIO 没有自动创建不存在的 BigQuery 数据集的可配置选项,它只有创建不存在的 BigQuery 表的选项,但指定的数据集资源必须存在或在调用 writing to table 代码之前创建。

    我在Beam documentation也发现了这样的结论

    要写入的数据集必须已经存在

    请参阅 official documentation 并查看 Cloud Dataflow 中如何处理 Java 异常,并参阅 examples

    Dataflow 服务在批处理模式下重试失败的任务最多 4 次,在流模式下重试失败的次数不受限制。在批处理模式下,您的作业将失败,而在流式传输中,它可能会无限期停止。

    希望对你有帮助。

    【讨论】:

      猜你喜欢
      • 2019-06-05
      • 2021-06-09
      • 1970-01-01
      • 2018-10-18
      • 1970-01-01
      • 2022-01-01
      • 2019-08-18
      • 2021-11-23
      • 2018-12-02
      相关资源
      最近更新 更多