【问题标题】:BigQuery writeTableRows Always writing to bufferBigQuery writeTableRows 始终写入缓冲区
【发布时间】:2018-04-19 07:34:34
【问题描述】:

我们正在尝试使用 Apache Beam 和 avro 写入 Big Query。

以下似乎工作正常:-

p.apply("Input", AvroIO.read(DataStructure.class).from("AvroSampleFile.avro"))
            .apply("Transform", ParDo.of(new CustomTransformFunction()))
            .apply("Load", BigQueryIO.writeTableRows().to(table).withSchema(schema));

然后我们尝试通过以下方式使用它从 Google Pub/Sub 获取数据

p.begin()
            .apply("Input", PubsubIO.readAvros(DataStructure.class).fromTopic("topicName"))
            .apply("Transform", ParDo.of(new CustomTransformFunction()))
            .apply("Write", BigQueryIO.writeTableRows()
                    .to(table)
                    .withSchema(schema)
                    .withTimePartitioning(timePartitioning)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        p.run().waitUntilFinish();

当我们这样做时,它总是将它推送到缓冲区,Big Query 似乎需要很长时间才能从缓冲区中读取。谁能告诉我为什么上面不会将记录直接写入 Big Query 表?

更新:- 看起来我需要添加以下设置,但这会引发 java.lang.IllegalArgumentException。

.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))

【问题讨论】:

    标签: google-bigquery apache-beam


    【解决方案1】:

    答案是您需要像这样包含“withNumFileShards”(可以是 1 到 1000)。

            p.begin()
                .apply("Input", PubsubIO.readAvros(DataStructure.class).fromTopic("topicName"))
                .apply("Transform", ParDo.of(new CustomTransformFunction()))
                .apply("Write", BigQueryIO.writeTableRows()
                        .to(table)
                        .withSchema(schema)
                        .withTimePartitioning(timePartitioning)
                .withMethod(Method.FILE_LOADS)
                .withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
                .withNumFileShards(1000)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
            p.run().waitUntilFinish();
    

    我在任何地方都找不到这说明 withNumFileShards 是强制性的,但是我在修复后找到了一个 Jira 票证。

    https://issues.apache.org/jira/browse/BEAM-3198

    【讨论】:

      猜你喜欢
      • 2020-11-27
      • 1970-01-01
      • 2015-02-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多