【发布时间】:2022-06-19 14:48:29
【问题描述】:
我正在使用 Confluent 架构注册表和 Avro。使用 JDBC 连接器将数据摄取到 kafka,该连接器使用 SMT 来创建适当的 avro 模式。在使用 SpecificAvroSerde 反序列化期间出现问题。我有很多类似的案例,它们都很好用。因此,一般来说,使用 avro 摄取数据、生成 avro 模式和在流处理器中消费的方法是可行的。这种情况的不同之处在于记录包含一个数组(一种主/详细记录)。下面是架构的简化版本:
{
"namespace": "io.confluent.base.model",
"type": "record",
"name": "Test1",
"fields": [
{ "name": "opt_identifier", "type": [ "null", "string" ],"default": null },
{ "name": "opt_amount", "type": [ "null", { "type":"bytes", "logicalType":"decimal", "precision":31, "scale":8 }], "default": null},
{ "name": "arr_field", "type": ["null", { "type": "array",
"items": {
"name": "TestTest1",
"type": "record",
"fields": [
{ "name": "opt_identifier_", "type": [ "null", "string" ],"default": null },
{ "name": "opt_amount_", "type": [ "null", { "type":"bytes", "logicalType":"decimal", "precision":31, "scale":8 }], "default": null}
]
},
"default": [] }],
"default": null}
]
}
架构是使用 avro maven 插件编译的。 connector 和 sonsumer 都使用相同的 avro jar 版本。我收到的例外是
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 79
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:409) ~[kafka-avro-serializer-7.0.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:114) ~[kafka-avro-serializer-7.0.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) ~[kafka-avro-serializer-7.0.1.jar:na]
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-7.0.1.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66) ~[kafka-streams-avro-serde-7.0.1.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38) ~[kafka-streams-avro-serde-7.0.1.jar:na]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:304) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:960) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1000) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:914) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555) ~[kafka-streams-3.0.0.jar:na]
Caused by: java.lang.ClassCastException: class java.nio.HeapByteBuffer cannot be cast to class java.math.BigDecimal (java.nio.HeapByteBuffer and java.math.BigDecimal are in module java.base of loader 'bootstrap')
at io.confluent.base.model.TestTest1.put(TestTest1.java:416) ~[classes/:na]
at org.apache.avro.generic.GenericData.setField(GenericData.java:818) ~[avro-1.10.1.jar:1.10.1]
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) ~[avro-1.10.1.jar:1.10.1]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) ~[avro-1.10.1.jar:1.10.1]
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.10.1.jar:1.10.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) ~[avro-1.10.1.jar:1.10.1]
at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298) ~[avro-1.10.1.jar:1.10.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183) ~[avro-1.10.1.jar:1.10.1]
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) ~[avro-1.10.1.jar:1.10.1]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) ~[avro-1.10.1.jar:1.10.1]
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.10.1.jar:1.10.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) ~[avro-1.10.1.jar:1.10.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) ~[avro-1.10.1.jar:1.10.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) ~[avro-1.10.1.jar:1.10.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) ~[avro-1.10.1.jar:1.10.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[avro-1.10.1.jar:1.10.1]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:400) ~[kafka-avro-serializer-7.0.1.jar:na]
... 17 common frames omitted
我可以使用 GenericRecord 读取相同的消息,并且所有字段都在那里。因此,avro 记录被正确序列化。
我目前的理解:
- 问题与逻辑类型有关
- 相同的逻辑类型在主级别反序列化(例如
opt_amount)没有问题 - 字段
opt_amount_但是会引发异常,因此我怀疑此嵌套详细记录TestTest1的使用方式与主记录Test1的使用方式不同。
【问题讨论】:
-
如果要将字节缓冲区转换为
BigDecimal,那不应该使用Specific,现在它正在使用Generic并尝试转换。 -
当然,从流中读取 avro 对象的方法是通过
final SpecificAvroSerde<Test1> testSpecificAvroSerde1 = new SpecificAvroSerde<>(); final Map<String, Object> props = this.kafkaProperties.buildStreamsProperties(); testSpecificAvroSerde1.configure(props, false);我只提到Generic说,我已经尝试过并且可以读取 avro 消息。 -
我在谈论堆栈跟踪,例如
GenericDatumReader.readWithoutConversion. -
知道了 - 是的,完全同意。因此我的评论 -
nested detailed record (TestTest1) is not used in the same way as the master record (Test1)但不知道如何执行。没有正确考虑逻辑类型,这会导致特定记录public void put(int field$, java.lang.Object value$)中的强制转换异常。这两个类都在那里并且似乎被使用了。但是对于细节部分(即在数组中),逻辑类型的处理不起作用。
标签: java apache-kafka-streams avro