【发布时间】:2019-04-12 11:00:08
【问题描述】:
我正在使用 Kafka Consumer API 来构建消费者。消息结构复杂。为了构建反序列化器,我实现了 Deserializer 类并提供了必要的实现。我使用 Jackson API 进行反序列化。 我收到此错误 “异常引发org.apache.kafka.common.errors.SerializationException:在偏移19205124处为分区staging.datafeeds.PartnerHotel-0反序列化键/值时出错”
#POJO classes
public class Change {
private Schema schema;
private Payload payload;
//Getters and constructor
}
public class Details {
private List<String> effectedAttributes;
private List<PartnerHotel> cluster;
//Getters and contructor
}
public class Field {
private String type;
private Boolean optional;
private String field;
//Getters and constructor
}
public class Fields {
private String type;
private List<Field> fields;
private Boolean optional;
private String name;
//Getters and contructor
}
public class Geom{
private int srid;
private String wkb;
//Getters and contructor
}
public class PartnerHotel{
private int id;
private int shopId;
private String partnerHotelId;
private boolean isOnline;
private boolean isRemovedByUser;
private int mappingPriority;
private int hotelId;
private String statusHotelId;
private String name;
private String street;
private String zipCode;
private String city;
private String sourceCityId;
private String state;
private String stateAlpha2;
private String country;
private String alpha2;
private String alpha3;
private double latitude;
private double longitude;
private Geom geomPoint;
private int countryIdShop;
private int selectedGeoname;
private String propertyType;
private List<String> tags;
private int stars;
private String url;
private int nrRatings;
private double recommendation;
private long dateHotelId;
private long timeStamp;
private long lastImport;
//Getters and contructor
}
public class Payload {
private PartnerHotel before;
private PartnerHotel after;
private Source source;
private String op;
private String ts_ms;
//Getters and contructor
}
public class Schema {
private String type;
private Boolean optional;
private String name;
private List<Fields> fields;
//Getters and contructor
}
public class Source {
private String version;
private String name;
private String ts_usec;
private String txId;
private String lxn;
private Boolean snapshot;
private Object lastSnapshotRecord;
//Getters and contructor
}
#Deserializer
public class ChangeDeserializer implements Deserializer<Change> {
public ChangeDeserializer(){ }
public void configure(Map<String, ?> map, boolean b) {}
public Change deserialize(String topic, byte[] data) {
if(data == null){
return null;
}
try{
ObjectMapper objectMapper = new ObjectMapper();
Change change = objectMapper.readValue(data,Change.class);
return change;
}
catch(IOException exception){
throw new DeserializationException("Unable to deserialize Change", exception);
}}
public void close() {}
}
#Consumer
public class KafkaAcnowledger {
public static void main(String[] args){
Properties props = new Properties();
props.put("bootstrap.servers", "someUrl");
props.put("group.id", "test131");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("max.poll.records",1);
props.put("auto.offset.reset","earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
props.put("value.deserializer", "deserializer.ChangeDeserializer");
KafkaConsumer<Long, Change> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("staging.datafeeds.PartnerHotel"));
while (true) {
try{
ConsumerRecords<Long, Change> records = consumer.poll(100);
for (ConsumerRecord<Long, Change> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
catch(Exception exception){
System.out.println("Exception raised" + exception);
}
}
}
}
消费者中的 poll() 看起来不错,enter code hereexception 我得到一个序列化异常。我通过 kafka-consumer-groups.sh 检查了消费者组,这个消费者的组在列表中。任何方向都值得赞赏。
Kafka 主题中的消息结构:
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int16","optional":false,"field":"shopId"},{"type":"string","optional":false,"field":"partnerHotelId"},{"type":"boolean","optional":false,"field":"isOnline"},{"type":"boolean","optional":false,"field":"isRemovedByUser"},{"type":"int32","optional":false,"field":"mappingPriority"},{"type":"int32","optional":true,"field":"hotelId"},{"type":"string","optional":true,"field":"statusHotelId"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"dateHotelId"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"timestamp"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"lastImport"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"street"},{"type":"string","optional":true,"field":"zipcode"},{"type":"string","optional":true,"field":"city"},{"type":"string","optional":true,"field":"sourceCityId"},{"type":"string","optional":true,"field":"state"},{"type":"string","optional":true,"field":"stateAlpha2"},{"type":"string","optional":true,"field":"country"},{"type":"string","optional":true,"field":"alpha2"},{"type":"string","optional":true,"field":"alpha3"},{"type":"double","optional":true,"field":"latitude"},{"type":"double","optional":true,"field":"longitude"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"geomPoint"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedGeonames"},{"type":"int32","optional":true,"field":"countryIdShop"},{"type":"int32","optional":true,"field":"selectedGeoname"},{"type":"string","optional":true,"field":"propertyType"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"tags"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"chains"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"creditCards"},{"type":"int32","optional":true,"field":"stars"},{"type":"string","optional":true,"field":"url"},{"type":"int32","optional":true,"field":"nrRatings"},{"type":"double","optional":true,"field":"recommendation"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedPartnerHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"removedFromHotelIds"}],"optional":true,"name":"staging.datafeeds.PartnerHotel.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int16","optional":false,"field":"shopId"},{"type":"string","optional":false,"field":"partnerHotelId"},{"type":"boolean","optional":false,"field":"isOnline"},{"type":"boolean","optional":false,"field":"isRemovedByUser"},{"type":"int32","optional":false,"field":"mappingPriority"},{"type":"int32","optional":true,"field":"hotelId"},{"type":"string","optional":true,"field":"statusHotelId"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"dateHotelId"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"timestamp"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"lastImport"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"street"},{"type":"string","optional":true,"field":"zipcode"},{"type":"string","optional":true,"field":"city"},{"type":"string","optional":true,"field":"sourceCityId"},{"type":"string","optional":true,"field":"state"},{"type":"string","optional":true,"field":"stateAlpha2"},{"type":"string","optional":true,"field":"country"},{"type":"string","optional":true,"field":"alpha2"},{"type":"string","optional":true,"field":"alpha3"},{"type":"double","optional":true,"field":"latitude"},{"type":"double","optional":true,"field":"longitude"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"geomPoint"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedGeonames"},{"type":"int32","optional":true,"field":"countryIdShop"},{"type":"int32","optional":true,"field":"selectedGeoname"},{"type":"string","optional":true,"field":"propertyType"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"tags"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"chains"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"creditCards"},{"type":"int32","optional":true,"field":"stars"},{"type":"string","optional":true,"field":"url"},{"type":"int32","optional":true,"field":"nrRatings"},{"type":"double","optional":true,"field":"recommendation"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedPartnerHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"removedFromHotelIds"}],"optional":true,"name":"staging.datafeeds.PartnerHotel.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"db"},{"type":"int64","optional":true,"field":"ts_usec"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"string","optional":true,"field":"schema"},{"type":"string","optional":true,"field":"table"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"boolean","optional":true,"field":"last_snapshot_record"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"staging.datafeeds.PartnerHotel.Envelope"},"payload":{"before":null,"after":{"id":13893497,"shopId":135,"partnerHotelId":"6-42036","isOnline":false,"isRemovedByUser":false,"mappingPriority":0,"hotelId":null,"statusHotelId":"AUTO","dateHotelId":null,"timestamp":1529334013938327,"lastImport":1503491984188866,"name":"Ferienvermietung Wiedemann","street":"Chausseeberg 3","zipcode":"17429","city":"Mellenthin","sourceCityId":null,"state":null,"stateAlpha2":null,"country":"Deutschland","alpha2":"DE","alpha3":null,"latitude":53.920278,"longitude":14.013333,"geomPoint":{"wkb":"AQEAACDmEAAARuo9ldMGLEA5nWSry/VKQA==","srid":4326},"proposedGeonames":[2872064],"countryIdShop":83,"selectedGeoname":2872064,"propertyType":null,"tags":["77","36","33","34","38","43","41","123","26","29","1","7","6","70","9","1000","58","17","18","15","13","14","20","65","63","46","10","52"],"chains":[],"creditCards":[],"stars":null,"url":"http://www.buchen.travel/onepage-idealo-booking/index.php?room=6-42036","nrRatings":null,"recommendation":null,"proposedHotels":[],"proposedPartnerHotels":[],"removedFromHotelIds":[]},"source":{"version":"0.8.3.Final","name":"staging","db":"geo","ts_usec":1554391067119000,"txId":4757138,"lsn":1139303143104,"schema":"datafeeds","table":"PartnerHotel","snapshot":true,"last_snapshot_record":false},"op":"r","ts_ms":1554391067119}}
【问题讨论】:
-
请说明您正在尝试什么。当您使用真正的 Plain java 对象(仅具有原始类的字段)时是否相同?您检查过序列化对象是否具有相同的结构?
-
对于序列化对象,数据通过 Debezium(Kafka 连接连接器)发送到 Kafka 主题,用于将更改从 DB 流式传输到 Kafka。我正在尝试的是使用来自主题的消息(从 Debezium 发送的地方),然后执行一些操作,所以第一步是通过适当的反序列化获取该数据。
-
我建议第一步是减少导致问题的选项范围。您可以尝试发送到简单的测试主题
TestClass { long value; String text;}吗?如果你可以反序列化它,那么问题就在类和反序列化的某个地方。如果你不这样做,那么问题在 Debezium 配置中的某个地方。 -
我不知道 Jackson,但它是否支持类型 Object (Object lastSnapshotRecord;) - 如果您的类版本不同,它可能必须恢复为默认的 Java 序列化,这可能会让您遇到问题...我现在会尝试不使用该字段
-
注意:Spring Kafka 和 Confluent 都已经提供了 JSON 反序列化器。你不需要自己写
标签: java apache-kafka kafka-consumer-api