【问题标题】:Delete Messages from a Topic in Apache Kafka从 Apache Kafka 的主题中删除消息
【发布时间】:2018-04-17 21:54:43
【问题描述】:

所以我是使用 Apache Kafka 的新手,我正在尝试创建一个简单的应用程序,以便更好地理解 API。我知道这个问题在这里被问了很多,但是我怎样才能清除存储在主题上的消息/记录?

我看到的大多数答案都说要更改消息保留时间或删除并重新创建主题。这些都不是我的选择,因为我无权访问 server.properties 文件。我没有在本地运行 Kafka,它托管在服务器上。有没有办法在Java代码中做到这一点?

【问题讨论】:

    标签: java apache-kafka


    【解决方案1】:

    如果您正在寻找有选择地删除消息的方法,新的 AdminClient API(可从 Java 代码中使用)提供以下 deleteRecords 方法:

    https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/admin/AdminClient.html

    【讨论】:

    • 我使用 AdminUtils 来删除已经存在的主题并重新创建它。不完全是最好的解决方案,但我在使用你提到的 AdminClient 时遇到了问题,因为我只有 deleteRecordsBefore() 方法,我不确定该怎么做。
    • 如果您使用 deleteRecordsBefore(),则您在 AdminUtils 中使用旧的 Scala 客户端,而在较新版本中建议的方法是使用 Java 中的新 AdminClient API。您有一个 deleteRecords 方法,该方法为您提供了一种删除所有偏移量小于给定偏移量的消息的方法。
    • 你是对的。我显然使用了错误的 AdminClient,这就是为什么我找不到开始的方法,因此求助于使用 AdminUtils。我在不知不觉中从我的 IDE 中提取了错误的导入语句,kafka.admin.AdminClient 而不是 org.apache.kafka.clients.admin.AdminClient。不确定第一个到底是什么,但它引入了像 deleteRecordsBefore() 这样的旧方法。感谢您的帮助!
    • 有没有办法通过给它偏移量来删除一条消息?谢谢