【发布时间】:2019-08-02 11:40:32
【问题描述】:
我需要在 KStream 中使用 transform() 操作,但在未设置必要的 serdes 时会出现常见的 ClassNotFoundException:
Caused by: java.lang.ClassCastException: xxx.SomeKey cannot be cast to [B
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
val someKeySerde = SpecificAvroSerde<SomeKeySerde>()
someKeySerde.configure(serdeConfig, false)
val someValueSerde = SpecificAvroSerde<SomeValueSerde>()
someValueSerde.configure(serdeConfig, false)
val someExtendedValueSerde = SpecificAvroSerde<SomeExtendedValueSerde>()
someExtendedValueSerde.configure(serdeConfig, false)
myKStream
.transform(TransformerSupplier {
object : Transformer<SomeKey, SomeValue, KeyValue<SomeKey, SomeValue>> {
private lateinit var context: ProcessorContext
override fun close() {
}
override fun transform(key: SomeKey, value: SomeValue): KeyValue<SomeKey, SomeValue> {
println("@@@@@@@@@@@@ timestamp ${context.timestamp()}")
...
return KeyValue(key, enrichedValue)
}
override fun init(context: ProcessorContext) {
this.context = context
}
}
}).groupByKey()
.aggregate(getSomeValueAggregationInitializer(),
getAggregator("absolute"),
materializedAbsoluteSomeValueFrequency)
其他 KStream 操作允许我们指定要使用的 serdes,但不能在转换的情况下。我该如何设置它们? (如您在上面看到的,SpecificAvroSerde)?
更新: 正如 Matthias 所指出的,问题在于转换后的 groupByKey 操作中缺少 Serdes。我已经用新问题更新了问题标题。
1) 为什么没有Grouped.with(clientProjectIdSerde, deploymentFinishedSerde) 时没有transform() 调用但我需要在transform() 之后添加它?
如果我更新 groupById 以包含 GroupedWith -> .groupByKey(Grouped.with(clientProjectIdSerde, deploymentFinishedSerde)) 现在我可以看到通话记录 ("@@@@@@@@@@@@ ... 但出现了一个新问题:
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: java.io.EOFException
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:160)
at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:162)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:184)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
如果我在没有.transform() 的情况下留下完全相同的代码,它可以工作:
myKStream
.groupByKey(Grouped.with(clientProjectIdSerde, deploymentFinishedSerde))
.aggregate(getSomeValueAggregationInitializer(),
getAggregator("absolute"),
materializedAbsoluteSomeValueFrequency)
2) 为什么我在使用 transform() 时会收到此 Avro 异常,但如果没有它,我该如何解决?
从 SomeValue Avro 对象中读取 dateTime long 字段时发生错误:
{
"namespace" : "xxx",
"type" : "record",
"name" : "SomeValue",
"fields" : [
{"name": "name", "type": "string"},
{"name":"dateTime", "type": "long", "logicalType": "timestamp-millis"}
]
}
【问题讨论】:
-
您确定 ClassCastException 是由您复制/粘贴的转换器抛出的,而不是在转换之后(或之前)的代码中吗?我问这个,因为例外是关于 SomeKey 的问题,但是在您的代码中您没有触及消息的关键部分:您接受 SomeKey 并发送 SomeKey ....
-
如果我删除了 transform() 调用,其余的过程工作正常,所以它似乎来自它。
-
从代码中看不清楚,但是只有在某些主题中写入了一些数据才会出现异常。在
transform()之后是否有连续的to()操作,或者您是否在transform()中使用了可能写入更改日志主题的状态存储? -
@MatthiasJ.Sax 我已经用新信息更新了这个问题,感谢您查看它
-
如果您在上游设置 Serdes,Kafka Stream 会尝试在下游使用它们,键和值类型不会改变。但是,如果您使用
transform(),键和值类型都可能会更改,因此使用上游 Serdes 是不安全的。因此,如果您不再设置 Serdes,Kafka Streams 会从配置中回退到默认的 Serdes。
标签: apache-kafka avro apache-kafka-streams