【问题标题】:Not able to deserialize Avro specific record as a generic record in Spring Cloud Stream无法将 Avro 特定记录反序列化为 Spring Cloud Stream 中的通用记录
【发布时间】:2019-12-26 11:40:08
【问题描述】:

我希望有一个使用 Spring Cloud Stream 的通用使用者,我不需要在编译时专门指定 Avro 消息的架构。由于消息中包含模式 id,因此应该可以使用模式 id 将其反序列化为对象或通用记录(这可以在 Spring Cloud Stream 框架之外轻松完成)。为此,我尝试使用与我们使用具有特定记录的 Spring Cloud Stream 应用程序相同的方法。显然,这种方法不起作用,因为我需要在消费时指定记录类型。它抛出以下异常:

Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class com.example.avro.model.InputModel specified in writer's schema whilst finding reader's schema for a SpecificRecord.

我想知道是否有一种方法可以在编译时不知道架构(和消息类型)的情况下使用来自主题的消息。

消费者的sn-p:

@StreamListener(Processor.INPUT)
public void handleMessage(Object message) {
...
}

P.S:我使用io.confluent.kafka.serializers.KafkaAvroDeserializer 作为值反序列化器。

【问题讨论】:

    标签: apache-kafka avro spring-kafka spring-cloud-stream confluent-schema-registry


    【解决方案1】:

    我见过类似的情况,在我的 yml 中,指定了 KafkaAvroDeserializer

    consumer-properties:
            key.deserializer:org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    

    然后在应用中,

    @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
        public void hand(Object payload) throws IOException {
            GenericRecord record = (GenericRecord) payload;
    
    }
    
    

    【讨论】:

      【解决方案2】:

      我可以通过设置spring.kafka.properties.specific.avro.reader= false 以及处理程序的以下方法签名来解决此问题:

      @StreamListener(Processor.INPUT)
      public void handleMessage(GenericRecord record) {
      ...
      }
      

      【讨论】:

        猜你喜欢
        • 2020-12-13
        • 2021-05-09
        • 1970-01-01
        • 2018-01-31
        • 2020-07-07
        • 1970-01-01
        • 1970-01-01
        • 2020-07-03
        • 1970-01-01
        相关资源
        最近更新 更多