【问题标题】:Write to BigQuery table is failing in Dataflow pipeline在 Dataflow 管道中写入 BigQuery 表失败
【发布时间】:2021-10-28 16:28:19
【问题描述】:

我正在开发一个 Dataflow 管道,它正在从谷歌云存储读取一个 protobuf 文件并对其进行解析并尝试写入 BigQuery 表。没有时它工作正常。行数约为 20k 但没有。行数约为 200k,然后失败。下面是示例代码:

Pipeline pipeline = Pipeline.create(options);

        PCollection<PBClass> dataCol = pipeline.apply(FileIO.match().filepattern(options.getInputFile()))
                .apply(FileIO.readMatches())
                .apply("Read GPB File", ParDo.of(new ParseGpbFn()));

dataCol.apply("Transform to Delta", ParDo.of(deltaSchema))
                .apply(Flatten.iterables())
                .apply(
                        BigQueryIO
                                //.write()
                                .writeTableRows()
                                .to(deltaSchema.tableSpec)
                                .withMethod(Method.STORAGE_WRITE_API)
                                .withSchema(schema)
                                //.withFormatFunction(irParDeltaSchema)
                                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                                .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
                                .withExtendedErrorInfo()
                )
        ;

尝试了以下方法的不同组合

withMethod
write
withFormatFunction

也不同。工人和不同的计算引擎类型。

每次卡在GroupByKey 阶段并给出以下错误:

Error message from worker: java.lang.RuntimeException: Failed to create job with prefix beam_bq_job_LOAD_testjobpackage_<...>, reached max retries: 3, last failed job: null.
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJob.runJob(BigQueryHelpers.java:199)
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJobManager.waitForDone(BigQueryHelpers.java:152)
    org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.finishBundle(WriteTables.java:322)

工作步骤表视图: 和图表视图:

【问题讨论】:

    标签: google-bigquery apache-beam google-dataflow


    【解决方案1】:
    Error message from worker: java.lang.RuntimeException: Failed to create job with prefix beam_bq_job_LOAD_testjobpackage_<...>, reached max retries: 3, last failed job: null.
                org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJob.runJob(BigQueryHelpers.java:199)
                org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJobManager.waitForDone(BigQueryHelpers.java:152)
                org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.finishBundle(WriteTables.java:322)
    

    您收到的错误代码 - 如上所述 - 是因为在您的代码中的某处,当您指定要加载的 GCS 文件时,它的格式不正确,URI 应该看起来像这样 gs:/ /bucket/path/to/file.

    【讨论】:

    • 从 gcs 读取不是问题,该作业能够从 GCS 读取文件,因为它能够写入表中没有较少编号的 1。行但无法写入其他没有的表。行数非常高
    • 你要写的行数是有限制的,你可以试着分部分写行吗?
    猜你喜欢
    • 2018-10-18
    • 1970-01-01
    • 1970-01-01
    • 2018-01-12
    • 1970-01-01
    • 2019-05-01
    • 2011-07-13
    • 2021-09-10
    • 1970-01-01
    相关资源
    最近更新 更多