【问题标题】:Kafka streams JSON-POJO deserialization exceptionKafka 流 JSON-POJO 反序列化异常
【发布时间】:2023-03-23 02:50:02
【问题描述】:
    ERROR org.apache.kafka.streams.errors.LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_1, topic: SourceTopic, partition: 1, offset: 0
org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "before" (class StreamsApplication$Person), not marked as ignorable (0 known properties: ])
 at [Source: [B@19ac162c; line: 1, column: 12] (through reference chain: Person["before"])
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "before" (class StreamsApplication$Person), not marked as ignorable (0 known properties: ])
 at [Source: [B@19ac162c; line: 1, column: 12] (through reference chain: Person["before"])
    at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:833)
    at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1096)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1467)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1445)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:282)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:140)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3789)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2920)
    at io.confluent.kafka.serializers.KafkaJsonDeserializer.deserialize(KafkaJsonDeserializer.java:75)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:55)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:865)
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:938)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:640)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
2756 [MyID-8bbe36da-883c-48eb-8b86-cad4a3acb5e2-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [MyID-8bbe36da-883c-48eb-8b86-cad4a3acb5e2-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: 
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.

这是我在 kafka 流中反序列化 kafka 主题中的 JSON 记录时遇到的异常。 JSON记录是嵌套的,如图:

{
"before": {
  "id": 1,
  "name": "abc"
},
"after": {
  "id": 1,
  "name": "xyz"
}

}

定义的POJO类是:

public static class Person{
        before before;
        after after;
    }
    public static class before{
        public int id;
        public String name;
    }
    public static class after{
        public int id;
        public String name;
    }

请告诉我我错在哪里以及如何改进。谢谢。

【问题讨论】:

  • json“before”中的名称与Person字段名称“beforeClass”不匹配
  • 更改后仍出现相同错误,请检查有问题的更改并指导
  • java 中的类名应该是驼峰式 - 它应该以大写字母开头。 ` public static class Before { public int id;公共字符串名称; }`
  • 解决了,它只需要getter setter和那些子类成为它的一部分

标签: json apache-kafka deserialization apache-kafka-streams json-deserialization


【解决方案1】:
 public static class Person{
        private Before before;
        private After after;
        
        public Before getBefore(){
            return before;
        }
        public After getAfter(){
            return after;
        }
        public static class Before{
            public int id;
            public String name;

        }
        public static class After{
            public int id;
            public String name;

        }
    }

我在 POJO 人员类中所做的更改有助于解决我的问题。

【讨论】:

    猜你喜欢
    • 2020-09-18
    • 2021-08-01
    • 2021-08-15
    • 1970-01-01
    • 2023-01-09
    • 2019-01-15
    • 1970-01-01
    • 2017-08-09
    • 2016-10-08
    相关资源
    最近更新 更多