【问题标题】:Apache Beam and avro : Create a dataflow pipeline without schemaApache Beam 和 avro:创建没有架构的数据流管道
【发布时间】:2018-03-02 14:20:04
【问题描述】:

我正在使用 Apache Beam 构建数据流管道。下面是伪代码:

PCollection<GenericRecord> rows = pipeline.apply("Read Json from PubSub", <some reader>)
    .apply("Convert Json to pojo", ParDo.of(new JsonToPojo()))
    .apply("Convert pojo to GenericRecord", ParDo.of(new PojoToGenericRecord()))
    .setCoder(AvroCoder.of(GenericRecord.class, schema));

我试图摆脱在管道中设置编码器,因为在管道创建时将不知道架构(它将出现在消息中)。

我注释掉了设置编码器的行并得到一个Exception 说没有配置默认编码器。我使用了of 方法的一个参数版本,得到了以下Exception

Not a Specific class: interface org.apache.avro.generic.GenericRecord
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
    at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
    at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
    ... 9 more

我们有什么方法可以在运行时提供编码器,而无需事先知道架构?

【问题讨论】:

  • 管道对 GenericRecord 的下游有什么作用?是否可以不使用 GenericRecord?
  • @jkff 它将GenericRecord 转换为TableRow 并写入BigQuery。它还使用AvroIO 写入文件系统(这是我面临的另一个问题,因为AvroIO 也需要架构)。
  • 您希望 Bigquery 表和生成的 Avro 文件具有什么架构?听起来您正在编写具有许多不同模式的记录,所以我想它们必须进入不同的表和不同的文件?
  • @jkff 我们正在使用架构演变,因此两个架构都符合同一个 BigQuery 表。只是一个模式将有额外的字段,而另一个则不会。我们将为这些字段插入null 值。

标签: google-cloud-dataflow avro apache-beam


【解决方案1】:

这是可能的。我推荐以下方法:

  • 不要使用 GenericRecord 类型的中间集合。将其作为 POJO 的集合。
  • 编写一些转换来提取数据架构并使其以PCollectionView&lt;however you want to represent the schema&gt; 的形式提供。
  • 写入 BigQuery 时,通过 write().to(DynamicDestinations) 写入您的 PCollection&lt;YourPojo&gt;,写入 Avro 时,将 FileIO.write()writeDynamic()AvroIO.sinkViaGenericRecords() 结合使用。这两者都可以从侧面输入(您在上面计算)获取动态计算的架构。

【讨论】:

  • 嘿@jkff,感谢您的回复。几个问题: 1. 我们已经使用PCollection 的 POJO 实现了对 BigQuery 的写入,而且它似乎工作正常。 2. 关于写入文件,能否请您提供如何用AvroIOPCollection&lt;POJO&gt; 写入GCS 的伪代码?我们尝试探索FileIOTextIO,但我们无法弄清楚write()writeDynamic() 方法将如何提供帮助。 AvroIO 需要 PCollection&lt;GenericRecord&gt; 才能编写,除非我们对需要编码器的 PCollection&lt;Pojo&gt; 应用转换,否则我们无法拥有它。
  • 特别是解释如何使用PCollectionPCollectionViewAvroIO 将文档写入动态目标的伪代码会很棒。
  • 您需要 Beam 2.3.0 或更高版本,然后您可以使用 FileIO.write/writeDynamic() 和 AvroIO.sink() - 请参阅 AvroIO github.com/apache/beam/blob/… 的单元测试中的示例跨度>
猜你喜欢
  • 2022-08-16
  • 2021-06-27
  • 2019-09-16
  • 2019-04-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-08-11
相关资源
最近更新 更多