【问题标题】:How to remove the state of a GlobalKTable store?如何删除 GlobalKTable 存储的状态?
【发布时间】:2018-03-09 12:28:44
【问题描述】:

我想删除我的GlobalKTable<Integer, Long> 商店的状态。

我尝试通过以下方式删除状态:

  • 调用kafka-topics.sh --zookeeper localhost:8080 --delete --topic my-topic删除整个Kafka主题
  • 在应用程序启动时运行 KafkaStreams.cleanUp()
  • 向我的流生成一条“Test:null”消息,因为 null 值应被视为存储中的 DELETE 语句,如 here 所述。但是,我的流应用程序失败,因为 null 值无法反序列化为 LONG。

查看以下异常:

org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:546)
        at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:920)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:821)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

如何删除我的 GlobalKTable 的状态?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    我找到了解决这个问题的方法。要删除 GlobalKTable 的整个状态,还需要清除 RocksDB 缓存文件。

    RocksDB 文件存储在位置StreamsConfig.STATE_DIR_CONFIG。我已经删除了文件,现在我的状态已经完全清除了。

    但是,可能有更好的解决方案来做到这一点?

    【讨论】:

    • @MatthiasJ.Sax 这是有道理的。期待发布。对异常部分有何评论?当 Kafka Streams 将抛出空值异常时,我应该如何从我的状态中删除条目?
    • 不确定异常。我加倍它与GlobalKTable 相关,因为它们是通过GlobalStreamThread 维护的,但异常来自StreamThreadLongDeserializer 也可以处理 null 消息。你用什么Serializer 来写Test:null 消息?也许它以不兼容的格式对其进行编码?为此,您需要更改 SerializerSerdes
    【解决方案2】:

    KafkaStreams.cleanUp() 实际上应该删除那些 RocksDB 文件。这是一个在即将发布的 1.1 版本中修复的错误:issues.apache.org/jira/browse/KAFKA-6259。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-04-08
      • 1970-01-01
      • 2020-08-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多