【问题标题】:Converting protobuf to bigquery in Java在 Java 中将 protobuf 转换为 bigquery
【发布时间】:2020-11-19 00:33:37
【问题描述】:

我们将 protobuf 与 GCP 的 pubsub 和数据流一起使用。我们使用单个 proto 文件定义发送到 pubsub 的数据和 bigquery 模式。

publisher -(send proto)-> pubsub -> dataflow -(write)-> bigquery

有时数据流会进行一些外观更改,但主要是将字段从 protobuf 复制到 bigquery。

我的问题是,有没有办法自动将 protobuf 模型转换为 bigquery 的 TableRow?

我们现在拥有的简化数据流代码如下。我想消除ProtoToTableRow 类中的大部分代码:

public class MyPipeline {
    public static void main(String[] args) {
        events = pipeline.apply("ReadEvents",
                PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));
        events.apply("ConvertToTableRows", ParDo.of(new ProtoToTableRow()))
                .apply("WriteToBigQuery", BigQueryIO.writeTableRows()
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                        .withExtendedErrorInfo()
                        .to(table));
    }
}

// I want this class to be super thin!
class ProtoToTableRow extends DoFn<Core.MyProtoObject, TableRow> {

    @ProcessElement
    public void processElement(ProcessContext c) {
        Core.Foo foo = c.element().getFoo();
        TableRow fooRow = new TableRow()
                .set("id", foo.getId())
                .set("bar", foo.getBar())
                .set("baz", foo.getBaz());

        // similar code repeated for 100s of lines

        TableRow row = new TableRow()
                .set("foo", foo)

        c.output(row);
    }
}

【问题讨论】:

    标签: java google-bigquery protocol-buffers google-cloud-dataflow


    【解决方案1】:

    您可以通过一种非常酷的方式完成此操作。 Beam 为各种类(包括 Java Bean、AutoValue 类以及 Protocol Buffers)提供了模式推断方法。

    对于您的管道,您不需要转换为 TableRow,您可以执行以下操作:

    pipeline.getSchemaRegistry().registerSchemaProvider(
        Core.MyProtoObject.class, new ProtoMessageSchema());
    
    events = pipeline.apply("ReadEvents",
                    PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));
    
    events.apply("WriteToBigQuery", BigQueryIO.<Core.MyProtoObject>write()
                            . useBeamSchema()
                            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                            .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                            .withExtendedErrorInfo()
                            .to(table));
    

    注意BigQueryIO.write 中的useBeamSchema 参数 - 这将使用自动转换。

    【讨论】:

    • 请注意,protobuf 模式提供程序不是核心 beam 的一部分,它是 beam-sdks-java-extensions-protobuf 的一部分:mvnrepository.com/artifact/org.apache.beam/…
    • 似乎 useBeamRows 不存在,所以我需要将其更改为 useBeamSchema。之后我收到类型错误“原因:不存在类型变量的实例,因此 PCollection 符合 PCollection”(事务是我的原型的名称)。有什么想法吗?
    • 回答我自己的问题 - 我需要像 BigQuery.write 一样向 BigQuery.write 提供类型信息。我会编辑答案
    • 嗯实际上它仍然无法正常工作。现在我从这一行得到运行时错误 checkArgument(input.hasSchema());即使我认为我通过 pipeline.getSchemaRegistry().registerSchemaProvider(Core.MyProtoObject.class, new ProtoMessageSchema());
    【解决方案2】:
    猜你喜欢
    • 1970-01-01
    • 2015-11-18
    • 2013-07-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多