【问题标题】:Kafka Json Value DeserializerKafka Json 值反序列化器
【发布时间】:2021-10-21 22:06:59
【问题描述】:

我正在使用具有以下属性的 kafka 消费者:

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.connect.json.JsonDeserializer

一个 KafkaProducer(value.serializer=org.apache.kafka.connect.json.JsonSerializer) 正在将 JSON 记录推送到一个主题中,这个消费者正在从中读取,功能方面它工作正常,但是当我的生产者推送非 JSON 消息(例如:空消息)。

在这种情况下,消费者正在下降,并且在清除该空消息之前它不会消费(我已将消费者组的偏移量重置为最新)。

有什么办法可以解决这个问题,也许使用一些属性或类似的东西

【问题讨论】:

    标签: serialization apache-kafka deserialization json-deserialization


    【解决方案1】:

    Consumer API 没有像 Kafka Streams 那样的反序列化异常处理属性

    您需要创建自己的反序列化器来包装 json 并处理任何错误

    你可以找到the SafeDeserializer class in azkarra-commons to be useful

    【讨论】:

      猜你喜欢
      • 2020-08-17
      • 2019-05-24
      • 2020-09-20
      • 1970-01-01
      • 2023-03-23
      • 2019-11-18
      • 2018-07-11
      • 1970-01-01
      • 2015-08-01
      相关资源
      最近更新 更多