【问题标题】:Kafka: deleting messages from topics with retention "compact"Kafka:从保留“紧凑”的主题中删除消息
【发布时间】:2016-12-18 01:30:03
【问题描述】:

我正在尝试使用 Java 在 Kafka 中实现一个关于压缩主题的最小工作示例。我的压缩效果很好,但是当我使用 kafka 文档中描述的键和空值编写消息时,看不到删除发生。

使用的库版本:kafka-clients-0.10.0.0.jar

以下是重现该行为的 Java 类的要点: https://gist.github.com/anonymous/f78184eaeec3ee82b15182aec24a432a

此外,查阅文档后,我在主题级别使用以下配置以尽快启动压缩:

min.cleanable.dirty.ratio=0.01
cleanup.policy=compact
segment.ms=100
delete.retention.ms=100

在 server.properties 方面,只是为了确定:

log.retention.check.interval.ms=100
log.cleaner.delete.retention.ms=100
log.cleaner.enable=true
log.cleaner.min.cleanable.ratio=0.01

当运行时,这个类显示压缩是有效的——只有一条消息在主题上具有相同的键。但是,我仍然看到带有“null”值的消息,在我看来应该删除。

我可以看到更干净的线程正在运行,产生如下输出: [2016-08-11 12:30:21,032] INFO Cleaner 0:将 log compaction-test-0 中的第 15 段(最后一次修改为 2016 年 8 月 11 日星期四 12:29:52 CEST)中的第 15 段清理为 0,保留删除。 (kafka.log.LogCleaner)

有谁知道为什么它是“保留删除”?我是否缺少任何相关的配置选项?我是否以正确的方式写“null”?

非常感谢任何想法。提前致谢!

更新:在调查了有用的 cmets 后,我升级到了 0.10.0.1,并在清理日志中发现了以下输出:

[2016-08-15 12:44:57,412] INFO Cleaner 0: Cleaning log compaction-test-0 (discarding tombstones prior to Mon Aug 15 12:44:40 CEST 2016)... (kafka.log.LogCleaner)
[2016-08-15 12:44:57,412] INFO Cleaner 0: Cleaning segment 0 in log compaction-test-0 (last modified Mon Aug 15 12:44:41 CEST 2016) into 0, retaining deletes. (kafka.log.LogCleaner)
[2016-08-15 12:44:57,412] INFO Cleaner 0: Cleaning segment 15 in log compaction-test-0 (last modified Mon Aug 15 12:44:41 CEST 2016) into 0, retaining deletes. (kafka.log.LogCleaner)
[2016-08-15 12:44:57,413] INFO Cleaner 0: Cleaning segment 16 in log compaction-test-0 (last modified Mon Aug 15 12:44:56 CEST 2016) into 0, retaining deletes. (kafka.log.LogCleaner)

由于“保留删除”设置为 val retainDeletes = old.lastModified > deleteHorizo​​nMs 并且相关段的最后修改日期似乎总是比删除范围稍晚,在我的最小示例中不会发生删除。

现在只是想知道如何调整设置或测试来处理这个问题......

【问题讨论】:

  • 因为它可能是一个错误,如果您分享您看到此行为的特定版本,将会有所帮助。
  • 您会看到“保留删除”,因为从技术上讲,根据最后一个干净段的最后修改时间和 delete.retention.ms 的值,现在不是丢弃段的时间
  • @GwenShapira 当然,好点,编辑问题。
  • @LucianoAfranllie:我认为通过将 delete.retention.ms 设置为 100,清理应该像压缩一样立即启动。我想知道我是否忘记了一个设置。
  • 是的,我刚刚看到了代码。 val deleteHorizo​​nMs = log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { case None => 0L case Some(seg) => seg.lastModified - log.config.deleteRetentionMs } ... val retainDeletes = old.lastModified > deleteHorizo​​nMs这可能与以下事实有关:如果没有脏偏移量,则 deleteHorizo​​nMs 设置为 0,在这种情况下,retainDeletes 将为真,因为 old.lastModified 是以毫秒为单位的 unix 时间戳。

标签: apache-kafka kafka-consumer-api kafka-producer-api


【解决方案1】:

此问题已在 0.10.1 中修复。看到这个 JIRA:https://issues.apache.org/jira/browse/KAFKA-4015

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-19
    • 2020-02-16
    • 2020-07-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多