【问题标题】:Kafka topic purge programmatically以编程方式清除 Kafka 主题
【发布时间】:2021-12-10 17:35:15
【问题描述】:

我尝试通过将保留时间设置为 1 秒然后返回原始值来从 Java 中清除 Kafka 主题,如下所示。但是消息没有从主题中删除。怎么了?

    Map<ConfigResource,Collection<AlterConfigOp>> altConf = new HashMap<ConfigResource,Collection<AlterConfigOp>>();
    Collection<AlterConfigOp> altConfOp = new ArrayList<AlterConfigOp>();
    AlterConfigOp aco1 = new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1000"), AlterConfigOp.OpType.SET);

    altConfOp.add(aco1);
    altConf.put(new ConfigResource(ConfigResource.Type.TOPIC, topic), altConfOp);
    
    ac.incrementalAlterConfigs(altConf);
    
    Thread.sleep(5000);

    altConf = new HashMap<ConfigResource,Collection<AlterConfigOp>>();
    altConfOp = new ArrayList<AlterConfigOp>();
    aco1 = new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, originalRetention), AlterConfigOp.OpType.SET);

    altConfOp.add(aco1);
    altConf.put(new ConfigResource(ConfigResource.Type.TOPIC, topic), altConfOp);
    
    ac.incrementalAlterConfigs(altConf);

【问题讨论】:

  • 不保证日志清理器在任何时间段内都运行过

标签: apache-kafka purge kafka-topic


【解决方案1】:

如 cmets 中所述,主题保留限制是下限,Kafka 不保证何时删除超过限制的记录。

删除记录的正确方法是使用 DeleteRecords API。

通过 AdminClient,您可以使用deleteRecords()。这将删除所有偏移量低于提供的偏移量的记录。您可以使用listOffsets() 查找分区的结束偏移量并将其传递给deleteRecords() 以有效清除主题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-04-23
    • 1970-01-01
    • 1970-01-01
    • 2016-02-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-23
    相关资源
    最近更新 更多