【问题标题】:Apache Beam/Google Dataflow PubSub to BigQuery Pipeline: Handling Insert Errors and Unexpected Retry BehaviorApache Beam/Google Dataflow PubSub 到 BigQuery 管道:处理插入错误和意外重试行为
【发布时间】:2019-02-02 06:56:28
【问题描述】:

我已从Google's github repository 中提取了Pub/Sub to BigQuery Dataflow template 的副本。我使用direct-runner在我的本地机器上运行它。

在测试中,我确认只有在 UDF 处理或从 JSON 到 TableRow 的转换过程中发生错误时,模板才会将失败写入“deadletter”表。

我还希望更优雅地处理插入 BigQuery 时发生的故障,方法是将它们发送到单独的 TupleTag 中,以便它们也可以发送到死信表或其他输出以供审查和处理。目前,当使用 dataflow-runner 执行时,这些错误只会写入 Stackdriver 日志,并且会无限期地继续重试,直到问题得到解决。

问题一:在本地测试并发布格式与目标表架构不匹配的消息时,插入被重试 5 次,然后管道崩溃并出现 RuntimeException 以及从对 Google API 的 HTTP 响应。我相信这种行为是在BigQueryServices.Impl 中设置的:

private static final FluentBackoff INSERT_BACKOFF_FACTORY =
        FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);

但是,基于Google's documentation

"在流模式下运行时,包含失败项目的包 将无限期重试,这可能会导致您的管道 永久停止。”

作为 Beam 的Pub/Sub.IO

创建和使用无限的 PCollections

我的印象是,从 Pub/Sub 读取时,应该默认启用流模式。我什至在我对 writeTableRows() 的调用中添加了 Streaming_Inserts 方法,它并没有影响这种行为。

.apply(
            "WriteSuccessfulRecords",      
            BigQueryIO.writeTableRows()
                .withMethod(Method.STREAMING_INSERTS)
  1. 这种行为是否受到我所在跑步者的影响 使用?如果不是,我的理解缺陷在哪里?

问题二

  1. 使用BigQueryIO.writeBigQueryIO.writeTableRows 时性能是否存在差异?

我问是因为我看不到如何在不创建自己的覆盖扩展方法并使用 ParDo 和 DoFn 的静态类的情况下捕获与插入相关的错误,我可以在其中添加自己的自定义逻辑来为成功记录创建单独的 TupleTag和失败记录,类似于在 JavascriptTextTransformer 中为 FailsafeJavascriptUdf 完成的操作。

更新

public static PipelineResult run(DirectOptions options) {

options.setRunner(DirectRunner.class);

    Pipeline pipeline = Pipeline.create(options);

    // Register the coder for pipeline
    FailsafeElementCoder<PubsubMessage, String> coder =
        FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());

    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);

     PCollectionTuple transformOut =
        pipeline
             //Step #1: Read messages in from Pub/Sub
            .apply(
                "ReadPubsubMessages",
  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))

             //Step #2: Transform the PubsubMessages into TableRows
            .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));

    WriteResult writeResult = null;

    try {
      writeResult = 
            transformOut
        .get(TRANSFORM_OUT)
        .apply(
            "WriteSuccessfulRecords",      
            BigQueryIO.writeTableRows()
                .withMethod(Method.STREAMING_INSERTS)
                .withoutValidation()
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                .to("myproject:MyDataSet.MyTable"));
    } catch (Exception e) {
        System.out.print("Cause of the Standard Insert Failure is: ");
        System.out.print(e.getCause());
    }

    try {
        writeResult
            .getFailedInserts()
            .apply(
                    "WriteFailedInsertsToDeadLetter",
                    BigQueryIO.writeTableRows()
                        .to(options.getOutputDeadletterTable())
                        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(WriteDisposition.WRITE_APPEND));
    } catch (Exception e) {
        System.out.print("Cause of the Error Insert Failure is: ");
        System.out.print(e.getCause());
    }

     PCollectionList.of(transformOut.get(UDF_DEADLETTER_OUT))
        .and(transformOut.get(TRANSFORM_DEADLETTER_OUT))
        .apply("Flatten", Flatten.pCollections())
        .apply(
            "WriteFailedRecords",
            WritePubsubMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    maybeUseDefaultDeadletterTable(
                        options.getOutputDeadletterTable(),
                        options.getOutputTableSpec(),
                        DEFAULT_DEADLETTER_TABLE_SUFFIX))
                .setErrorRecordsTableSchema(getDeadletterTableSchemaJson())
                .build());

    return pipeline.run();
  }

错误:

Cause of the Error Insert Failure is: null[WARNING] 
java.lang.NullPointerException: Outputs for non-root node WriteFailedInsertsToDeadLetter are null
    at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:672)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at com.google.cloud.teleport.templates.PubSubToBigQuery.run(PubSubToBigQuery.java:312)
    at com.google.cloud.teleport.templates.PubSubToBigQuery.main(PubSubToBigQuery.java:186)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)

【问题讨论】:

    标签: google-bigquery google-cloud-dataflow apache-beam google-cloud-pubsub


    【解决方案1】:

    在最新版本的 Beam 中,BigQueryIO.Write 转换返回一个 WriteResult 对象,使您能够检索未能输出到 BigQuery 的 TableRows 的 PCollection。使用它,您可以轻松检索失败,将它们格式化为死信输出的结构,然后将记录重新提交到 BigQuery。这样就无需使用单独的类来管理成功和失败的记录。

    以下是您的管道的示例。

    // Attempt to write the table rows to the output table.
    WriteResult writeResult =
        pipeline.apply(
            "WriteRecordsToBigQuery",
            BigQueryIO.writeTableRows()
                .to(options.getOutputTable())
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
    
    /*
     * 1) Get the failed inserts
     * 2) Transform to the deadletter table format.
     * 3) Output to the deadletter table.
    */
    writeResult
      .getFailedInserts()
        .apply("FormatFailedInserts", ParDo.of(new FailedInsertFormatter()))
        .apply(
            "WriteFailedInsertsToDeadletter",
            BigQueryIO.writeTableRows()
                .to(options.getDeadletterTable())
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND));
    

    另外,回答您的问题:

    1. 根据梁docs,必须设置streaming DirectRunner 的 true 选项。
    2. 应该没有 性能差异。无论哪种情况,您都需要将 输入记录到TableRow 对象。应该没什么区别 如果您事先在 ParDo 中或在可序列化中执行此操作 函数使用BigQueryIO.Write.withFormatFunction

    【讨论】:

    • 谢谢,我没有注意到直接运行器文档中提到流式传输选项。我只是添加了那个 arg 但它没有改变任何东西。重试 5 次后,该异常仍然使管道崩溃。现在就捕获故障提出其他建议。
    • 我已更新我的问题以显示应用于我的管道的更改,以尝试根据您的建议捕获 WriteResults。我现在收到以下错误:“java.lang.NullPointerException:非根节点 WriteFailedInsertsToDeadLetter 的输出为空。”
    • 将 .withJsonSchema 添加到 writeTableRows() 调用解决了错误消息,我现在看到在第 5 次失败后尝试将失败的记录插入到 DeadletterTable 中。在这一点上,我想我只需要实现 FailedInsertFormatter 将数据按摩成 DeadletterTable 可以接受的东西。最后,似乎 .getFailedInserts() 只返回 TableRows。如何从初始插入尝试中获取 http 响应中返回的相关错误?
    猜你喜欢
    • 2022-11-26
    • 2020-03-10
    • 1970-01-01
    • 2018-01-25
    • 2015-12-14
    • 2021-01-05
    • 2018-12-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多