【问题标题】:How to write to BigQuery with BigQuery IO in Apache Beam?如何在 Apache Beam 中使用 BigQuery IO 写入 BigQuery?
【发布时间】:2020-06-23 02:46:56
【问题描述】:

我正在尝试设置一个 Apache Beam 管道,该管道从 Kafka 读取数据并使用 Apache Beam 写入 BigQuery。 我正在使用这里的逻辑来过滤掉一些坐标:https://www.talend.com/blog/2018/08/07/developing-data-processing-job-using-apache-beam-streaming-pipeline/ TLDR:主题中的消息格式为 id,x,y。过滤掉所有 x>100 或 y>100

的消息

我读取数据,进行几次转换,然后定义我的表架构,然后尝试写入 Bigquery。我不确定如何调用 write 方法。这可能是缺乏 Java 泛型知识。我相信它应该是一个 PCollection,但不能安静地弄清楚。

这是管道代码 - 如果它被认为是代码转储,我只想给出整个上下文:

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(
                KafkaIO.<Long, String>read()
                        .withBootstrapServers(options.getBootstrap())
                        .withTopic(options.getInputTopic())
                        .withKeyDeserializer(LongDeserializer.class)
                        .withValueDeserializer(StringDeserializer.class))
        .apply(
                ParDo.of(
                        new DoFn<KafkaRecord<Long, String>, String>() {
                            @ProcessElement
                            public void processElement(ProcessContext processContext) {
                                KafkaRecord<Long, String> record = processContext.element();
                                processContext.output(record.getKV().getValue());
                            }
                        }))
        .apply(
                "FilterValidCoords",
                Filter.by(new FilterObjectsByCoordinates(options.getCoordX(), options.getCoordY())))
        .apply(
                "ExtractPayload",
                ParDo.of(
                        new DoFn<String, KV<String, String>>() {
                            @ProcessElement
                            public void processElement(ProcessContext c) throws Exception {
                                c.output(KV.of("filtered", c.element()));
                            }
                        }));

        TableSchema tableSchema =
        new TableSchema()
                .setFields(
                        ImmutableList.of(
                                new TableFieldSchema()
                                        .setName("x_cord")
                                        .setType("STRING")
                                        .setMode("NULLABLE"),
                        new TableFieldSchema()
                                .setName("y_cord")
                                .setType("STRING")
                                .setMode("NULLABLE")

                        ));
        pipeline
                .apply(
                "Write data to BQ",
                BigQueryIO
                        .<String, KV<String, String>>write() //I'm not sure how to call this method
                        .optimizedWrites()
                        .withSchema(tableSchema)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withSchemaUpdateOptions(ImmutableSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
                        .withMethod(FILE_LOADS)
                        .to(new TableReference()
                                .setProjectId("prod-analytics-264419")
                                .setDatasetId("publsher")
                                .setTableId("beam_load_test"))
        );

【问题讨论】:

  • 尝试做 PCollection object_name = p.apply( .... 然后使用这个 object_name 来做 object_name.apply( "Write data to BQ", BigQueryIO [.. .]

标签: java apache-kafka google-bigquery apache-beam apache-beam-io


【解决方案1】:

你想要这样的东西:

[..] 
pipeline.apply(BigQueryIO.writeTableRows()
        .to(String.format("%s.dataset.table", options.getProject()))
        .withCreateDisposition(CREATE_IF_NEEDED)
        .withWriteDisposition(WRITE_APPEND)
        .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
        .withSchema(getTableSchema()));

【讨论】:

  • 感谢您的意见。我试过了,我得到了Required type: PTransform&lt;? super PBegin,OutputT&gt; Provided:Write&lt;TableRow,,这与我使用上述状态方法得到的错误类似
  • 您需要将写入应用到PCollection&lt;E&gt;
  • 我的理解是每个应用都返回一个 PCollection,所以我认为我正在这样做。我也尝试在每次应用后保存一个新变量,仍然面临同样的问题
猜你喜欢
  • 2021-03-25
  • 2020-12-21
  • 2020-03-31
  • 2017-12-15
  • 2018-11-18
  • 2018-12-07
  • 2021-02-25
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多