【发布时间】:2018-03-02 14:20:04
【问题描述】:
我正在使用 Apache Beam 构建数据流管道。下面是伪代码:
PCollection<GenericRecord> rows = pipeline.apply("Read Json from PubSub", <some reader>)
.apply("Convert Json to pojo", ParDo.of(new JsonToPojo()))
.apply("Convert pojo to GenericRecord", ParDo.of(new PojoToGenericRecord()))
.setCoder(AvroCoder.of(GenericRecord.class, schema));
我试图摆脱在管道中设置编码器,因为在管道创建时将不知道架构(它将出现在消息中)。
我注释掉了设置编码器的行并得到一个Exception 说没有配置默认编码器。我使用了of 方法的一个参数版本,得到了以下Exception:
Not a Specific class: interface org.apache.avro.generic.GenericRecord
at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
... 9 more
我们有什么方法可以在运行时提供编码器,而无需事先知道架构?
【问题讨论】:
-
管道对 GenericRecord 的下游有什么作用?是否可以不使用 GenericRecord?
-
@jkff 它将
GenericRecord转换为TableRow并写入BigQuery。它还使用AvroIO写入文件系统(这是我面临的另一个问题,因为AvroIO也需要架构)。 -
您希望 Bigquery 表和生成的 Avro 文件具有什么架构?听起来您正在编写具有许多不同模式的记录,所以我想它们必须进入不同的表和不同的文件?
-
@jkff 我们正在使用架构演变,因此两个架构都符合同一个 BigQuery 表。只是一个模式将有额外的字段,而另一个则不会。我们将为这些字段插入
null值。
标签: google-cloud-dataflow avro apache-beam