暂时不支持。看看这个JIRA issue“添加删除主题支持”。
手动删除:
- 关闭集群
- 清理 kafka 日志目录(由 kafka config 文件中的
log.dir 属性指定)以及 zookeeper 数据
- 重启集群
对于任何给定的主题,您可以做的是
- 停止卡夫卡
- 清理特定于分区的 kafka 日志,kafka 以“logDir/topic-partition”格式存储其日志文件,因此对于名为“MyTopic”的主题,分区 id 0 的日志将存储在
/tmp/kafka-logs/MyTopic-0 中,其中 @987654328 @ 由log.dir 属性指定
- 重启卡夫卡
这是NOT 一个很好的推荐方法,但它应该可以工作。
在 Kafka 代理配置文件中,log.retention.hours.per.topic 属性用于定义 The number of hours to keep a log file before deleting it for some specific topic
另外,有没有办法在消费者阅读后立即删除消息?
来自Kafka Documentation:
Kafka 集群会在可配置的时间段内保留所有已发布的消息(无论它们是否已被使用)。例如,如果将日志保留时间设置为两天,则在消息发布后的两天内,它可以被使用,之后将被丢弃以释放空间。就数据大小而言,Kafka 的性能实际上是恒定的,因此保留大量数据不是问题。
事实上,每个消费者保留的唯一元数据是消费者在日志中的位置,称为“偏移量”。这个偏移量由消费者控制:通常消费者在读取消息时会线性增加偏移量,但实际上位置是由消费者控制的,它可以按照自己喜欢的任何顺序消费消息。例如,消费者可以重置为较旧的偏移量以重新处理。
为了找到在 Kafka 0.8 Simple Consumer example 中读取的起始偏移量,他们说
Kafka 包含两个常量来提供帮助,kafka.api.OffsetRequest.EarliestTime() 在日志中找到数据的开头并从那里开始流式传输,kafka.api.OffsetRequest.LatestTime() 只会流式传输新消息。
您还可以在此处找到用于管理消费者端偏移量的示例代码。
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}