【发布时间】:2018-03-09 12:28:44
【问题描述】:
我想删除我的GlobalKTable<Integer, Long> 商店的状态。
我尝试通过以下方式删除状态:
- 调用
kafka-topics.sh --zookeeper localhost:8080 --delete --topic my-topic删除整个Kafka主题 - 在应用程序启动时运行
KafkaStreams.cleanUp() - 向我的流生成一条“Test:null”消息,因为 null 值应被视为存储中的 DELETE 语句,如 here 所述。但是,我的流应用程序失败,因为 null 值无法反序列化为 LONG。
查看以下异常:
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:546)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:920)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:821)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
如何删除我的 GlobalKTable 的状态?
【问题讨论】:
标签: apache-kafka apache-kafka-streams