【发布时间】:2019-09-07 20:51:25
【问题描述】:
我正在尝试使用 Kafka 流创建一个简单的拓扑以大写 person 实体。
case class Person(id: Int, name: String, age: Int)
我的自定义序列化器和反序列化器是这样的:
class KafkaBytesSerializer[T] extends Serializer[T] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = 0
override def serialize(topic: String, data: T): Array[Byte] = {
val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(stream)
oos.writeObject(data)
oos.close()
stream.toByteArray
}
override def close(): Unit = 0
}
class KafkaBytesDeserializer[T] extends Deserializer[T]{
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = 0
override def deserialize(topic: String, data: Array[Byte]): T = {
val objIn = new ObjectInputStream(new ByteArrayInputStream(data))
val obj = objIn.readObject().asInstanceOf[T]
objIn.close
obj
}
override def close(): Unit = 0
}
流媒体应用的主要调用代码是这样的:
val personSerde: Serde[Person] =
Serdes.serdeFrom(new KafkaBytesSerializer[Person], new KafkaBytesDeserializer[Person])
val builder = new StreamsBuilder()
builder
.stream[String, Person](INPUT_TOPIC)(Consumed.`with`(Serdes.String(), personSerde))
.map[String, Person]((k,p) => (k, Person(p.id, p.name.toUpperCase(), p.age)))
.peek((k, p) => println("Key" + k + " Person: " + p))
.to(OUTPUT_TOPIC)(Produced.`with`(Serdes.String(), personSerde))
当我运行应用程序时,我得到了类转换异常:
[MainApp-consumer-group-b45b436d-1412-494b-9733-f75a61c9b9e3-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [MainApp-consumer-group-b45b436d-1412-494b-9733-f75a61c9b9e3-StreamThread-1] Encountered the following error during processing:
java.lang.ClassCastException: [B cannot be cast to models.Person
at org.apache.kafka.streams.scala.FunctionsCompatConversions$ValueMapperFromFunction$$anon$6.apply(FunctionsCompatConversions.scala:66)
at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
我怀疑反序列化级别出了点问题,但不知道为什么?
任何指针都会有所帮助。
【问题讨论】:
-
你确定这是完整的代码吗?看起来您在代码中调用了
KStream::mapValues? -
谢谢@wardziniak!修正错字并更新代码。
-
你确定这是完整的代码吗?看起来不错。您能否添加更多堆栈跟踪和导入?
-
@wardziniak 我在这里添加了提交github.com/surysharma/kafka-streams-tinker/commit/…希望这有帮助吗?
标签: scala apache-kafka apache-kafka-streams