【问题标题】:How do I set the value de/serializer for a Kafka KeyValueStore?如何为 Kafka KeyValueStore 设置值反序列化器?
【发布时间】:2019-07-03 08:18:09
【问题描述】:

我将来自 Kafka 主题的消息存储在 KeyValueStore 中,以便稍后查询它们。我创建一个 KTable 如下:

@StreamListener public void process(@Input("input") KTable<String,MyMessage> myMessages) {

我在 application.yml 中配置了消费者,如下所示:

更新的反序列化程序包

spring.cloud.stream.kafka.streams.bindings.input: consumer: materializedAs: all-messages key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: com.me.MyMessageDeserializer key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: com.me.MyMessageSerializer

但是,当我从 KeyValueStore 读取时,键作为字符串正确返回,但返回的值是字节数组而不是 MyMessage。出于某种原因,我的自定义反序列化器没有被使用。我试图自己反序列化消息,但我的反序列化器因异常而崩溃。我在我的序列化程序上放了一个断点,它永远不会被调用。我很清楚,我的序列化器或反序列化器都没有被使用。

我缺少什么配置才能使用我的自定义值反序列化器?是否需要在特定的包中找到反序列化器?

【问题讨论】:

  • 你有没有尝试在你的班级前面写完整的包名?还是在根目录下?
  • 尝试将完整的包名放在 MyMessageDeserializer 之前。
  • 我有完整的包名还是不行

标签: java spring apache-kafka-streams spring-cloud-stream


【解决方案1】:

在 application.yml 中使用了错误的配置键。而不是 key-deserializer: 它应该是 keySerde: 而不是 value-deserializer: 它应该是 valueSerde。下面是正确的配置:

spring.cloud.stream.kafka.streams.bindings.input: consumer: materializedAs: all-messages keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: com.me.MyMessageSerde producer: keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: com.me.MyMessageSerde

【讨论】:

    猜你喜欢
    • 2021-10-21
    • 2020-05-31
    • 1970-01-01
    • 1970-01-01
    • 2019-11-18
    • 2018-07-11
    • 1970-01-01
    • 2015-08-01
    • 1970-01-01
    相关资源
    最近更新 更多