【问题标题】:Deserialize Kafka AVRO messages using Apache Beam使用 Apache Beam 反序列化 Kafka AVRO 消息
【发布时间】:2019-07-12 07:58:35
【问题描述】:

主要目标是聚合两个 Kafka 主题,一个是压缩的慢速移动数据,另一个是每秒接收的快速移动数据。

我已经能够在简单的场景中使用消息,例如 KV (Long,String),使用类似的东西:

PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long, 
String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
  
PCollection<String> output = input.apply(Values.<String>create());

但是当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个需要消耗的 KV(STRING, AVRO)。

我尝试从 AVRO 模式生成 Java 类,然后将它们包含在“应用”中,例如:

PCollection<MyClass> output = input.apply(Values.<MyClass>create());

但这似乎不是正确的方法。

是否有任何人可以指出我的文档/示例,以便我了解您将如何使用 Kafka AVRO 和 Beam?

我已经更新了我的代码:

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;

public class Main {

public static void main(String[] args) {

    PipelineOptions options = PipelineOptionsFactory.create();

    Pipeline p = Pipeline.create(options);

    PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
    );

    p.run();

}
}

import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;

@DefaultCoder(AvroCoder.class)
public class Myclass{
String name;
String age;

Myclass(){}
Myclass(String n, String a) {
    this.name= n;
    this.age= a;
}
}

但我现在收到以下错误

incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >

我一定是导入了错误的序列化程序?

【问题讨论】:

    标签: java apache-kafka avro apache-beam dataflow


    【解决方案1】:

    我也遇到过同样的问题。在此邮件存档中找到了解决方案。 http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E

    在您的情况下,您需要定义自己的Deserializer&lt;MyClass&gt;,它可以从AbstractKafkaAvroDeserializer 扩展,如下所示。

    public class MyClassKafkaAvroDeserializer extends
      AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {
      
      @Override
      public void configure(Map<String, ?> configs, boolean isKey) {
          configure(new KafkaAvroDeserializerConfig(configs));
      }
    
      @Override
      public MyClass deserialize(String s, byte[] bytes) {
          return (MyClass) this.deserialize(bytes);
      }
    
      @Override
      public void close() {} }
    

    然后将您的 KafkaAvroDeserializer 指定为 ValueDeserializer。

    p.apply(KafkaIO.<Long, MyClass>read()
     .withKeyDeserializer(LongDeserializer.class)
     .withValueDeserializer(MyClassKafkaAvroDeserializer.class) );
    

    【讨论】:

    • 我发现从AbstractKafkaAvroDeserializer 扩展在这里并不重要,如果你只是实现接口,为KafkaAvroDeserializer 的具体实例创建一个内部字段并委托给它。在任何情况下,如果您从 KafkaAvroDeserialzer 扩展,您应该能够删除配置和关闭方法覆盖
    【解决方案2】:

    您可以按如下方式使用 KafkaAvroDeserializer:

    PCollection<KV<Long,MyClass>> input = p.apply(KafkaIO.<Long, String>read()
    .withKeyDeserializer(LongDeserializer.class)
      .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))
    

    其中 MyClass 是 POJO 类生成的 Avro Schema。

    确保您的 POJO 类具有注释 AvroCoder,如下例所示:

    @DefaultCoder(AvroCoder.class)
       public class MyClass{
          String name;
          String age;
    
          MyClass(){}
          MyClass(String n, String a) {
             this.name= n;
             this.age= a;
          }
      }
    

    【讨论】:

    • 如果来自 io.confluent.kafka.serializers.KafkaAvroDeserializer 的 KafkaAvroDeserializer.class?这就是我目前正在使用的,但它给了我一个错误,因为它期望来自 org.apache.kafka.common.serialization.Deserializer 的反序列化器
    • 是的,它来自 Confluent 包。你得到什么错误?你能粘贴错误堆栈跟踪吗?
    • 对不起,我应该更清楚,我得到一个编译错误,即:Error:(47, 69) java: incompatible types: java.lang.Class 无法转换为 java.lang.Class>。 AvroCode 来自 org.apache.beam.sdk.coders.AvroCoder
    • 您必须使用 AvroCoder 注释您的 POJO 类(如答案中给出的)。你做到了吗?
    • 感谢您回复我,我已根据您建议的更改更新了我的帖子。但是我仍然遇到同样的错误,我已经包含了我的导入,因为我假设它与它们有关。
    【解决方案3】:

    Yohei 的回答很好,但我也发现这很管用

    import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
    
    ...
    
    public static class CustomKafkaAvroDeserializer extends SpecificAvroDeserializer<MyCustomClass> {}
    
    ...
    .withValueDeserializerAndCoder(CustomKafkaAvroDeserializer.class, AvroCoder.of(MyCustomClass.class))
    ...
    

    MyCustomClass 是使用 Avro 工具生成的代码。

    【讨论】:

      【解决方案4】:

      我今天遇到了类似的问题,并遇到了以下示例,它为我解决了这个问题。

      https://github.com/andrewrjones/debezium-kafka-beam-example/blob/master/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java

      对我来说缺少的部分是(类)KafkaAvroDeserializer

      KafkaIO.<String, MyClass>read()
              .withBootstrapServers("kafka:9092")
              .withTopic("dbserver1.inventory.customers")
              .withKeyDeserializer(StringDeserializer.class)
              .withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))
      

      【讨论】:

      • .class 应该已经返回了一个 Class 对象,所以似乎不需要强制转换...
      【解决方案5】:

      KafkaIO.&lt;Long, String&gt;read() 更改为KafkaIO.&lt;Long, Object&gt;read()

      如果你查看KafkaAvroDeserializer 的实现,它实现了Deserializer&lt;Object&gt;

      public class KafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer&lt;Object&gt;

      【讨论】:

      • 我面临同样的错误,这也不能解决问题。我收到此错误:编译失败 [ERROR] /Users/01087872/Documents/fr-det-avro-sample/src/main/java/examples/MyClassConsumer.java:[19,17] 不兼容的类型:推理变量 T 不兼容等式约束 java.lang.Object,examples.MyClass
      猜你喜欢
      • 1970-01-01
      • 2017-06-29
      • 1970-01-01
      • 1970-01-01
      • 2020-12-13
      • 2019-11-07
      • 2019-11-25
      • 2019-11-17
      • 1970-01-01
      相关资源
      最近更新 更多