【发布时间】:2018-11-28 18:26:26
【问题描述】:
由于 API 从 Kafka 0.11 更改为 Kafka 2.0,我们面临一个问题。在我们基于 0.11 的 Kafka 流应用程序中,我们在两个使用命名状态存储的 KTables[String,Something] 之间建立了连接:
val joinedTable = {
myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table,new
MyJoiner, new MySerde[MyClass1Class2],"my-join-store")
}
但是,当迁移到 2.0 时,明确提供状态存储的唯一方法如下:
val joinedTable = {
val materialized = Materialized.as[String,MyClass1,KeyValueStore[Bytes,Array[Byte]]]("join-store").withValueSerde(new Serde[MyClass1Class2])
myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table,new
MyJoiner,materialized)
}
使用此代码,替换生产中的应用实例失败,因为 Kafka 0.11 中的状态存储可能使用了 myTable1 和 myTable2 的密钥 serde。
org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:174)
除了执行 kafka-streams-application-reset 之外,还有其他更好的方法来处理这个问题吗?
【问题讨论】:
标签: apache-kafka apache-kafka-streams