【发布时间】:2020-11-19 00:33:37
【问题描述】:
我们将 protobuf 与 GCP 的 pubsub 和数据流一起使用。我们使用单个 proto 文件定义发送到 pubsub 的数据和 bigquery 模式。
publisher -(send proto)-> pubsub -> dataflow -(write)-> bigquery
有时数据流会进行一些外观更改,但主要是将字段从 protobuf 复制到 bigquery。
我的问题是,有没有办法自动将 protobuf 模型转换为 bigquery 的 TableRow?
我们现在拥有的简化数据流代码如下。我想消除ProtoToTableRow 类中的大部分代码:
public class MyPipeline {
public static void main(String[] args) {
events = pipeline.apply("ReadEvents",
PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));
events.apply("ConvertToTableRows", ParDo.of(new ProtoToTableRow()))
.apply("WriteToBigQuery", BigQueryIO.writeTableRows()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(table));
}
}
// I want this class to be super thin!
class ProtoToTableRow extends DoFn<Core.MyProtoObject, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
Core.Foo foo = c.element().getFoo();
TableRow fooRow = new TableRow()
.set("id", foo.getId())
.set("bar", foo.getBar())
.set("baz", foo.getBaz());
// similar code repeated for 100s of lines
TableRow row = new TableRow()
.set("foo", foo)
c.output(row);
}
}
【问题讨论】:
标签: java google-bigquery protocol-buffers google-cloud-dataflow