【问题标题】:Is there a way to delete all the data from a topic or delete the topic before every run?有没有办法从主题中删除所有数据或在每次运行之前删除主题?
【发布时间】:2013-07-17 20:13:43
【问题描述】:

有没有办法从一个主题中删除所有数据或在每次运行之前删除该主题?

我可以修改 KafkaConfig.scala 文件以更改 logRetentionHours 属性吗?有没有办法在消费者阅读后立即删除消息?

我正在使用生产者从某处获取数据并将数据发送到消费者消费的特定主题,我可以在每次运行时从该主题中删除所有数据吗?我每次在主题中只想要新数据。有没有办法以某种方式重新初始化主题?

【问题讨论】:

标签: apache-kafka apache-zookeeper


【解决方案1】:

清理主题数据有两种解决方案

  1. 将 zookeeper 的 dataDir 路径“dataDir=/dataPath”更改为其他路径 值,删除kafka日志文件夹并重启zookeeper和kafka 服务器

  2. 从 Zookeeper 服务器运行 zkCleanup.sh

【讨论】:

    【解决方案2】:

    我在集成测试运行后使用下面的实用程序进行清理。

    它使用最新的AdminZkClient api。旧的 api 已被弃用。

    import javax.inject.Inject
    import kafka.zk.{AdminZkClient, KafkaZkClient}
    import org.apache.kafka.common.utils.Time
    
    class ZookeeperUtils @Inject() (config: AppConfig) {
    
      val testTopic = "users_1"
    
      val zkHost = config.KafkaConfig.zkHost
      val sessionTimeoutMs = 10 * 1000
      val connectionTimeoutMs = 60 * 1000
      val isSecure = false
      val maxInFlightRequests = 10
      val time: Time = Time.SYSTEM
    
      def cleanupTopic(config: AppConfig) = {
    
        val zkClient = KafkaZkClient.apply(zkHost, isSecure, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time)
        val zkUtils = new AdminZkClient(zkClient)
    
        val pp = new Properties()
        pp.setProperty("delete.retention.ms", "10")
        pp.setProperty("file.delete.delay.ms", "1000")
        zkUtils.changeTopicConfig(testTopic , pp)
        //    zkUtils.deleteTopic(testTopic)
    
        println("Waiting for topic to be purged. Then reset to retain records for the run")
        Thread.sleep(60000L)
    
        val resetProps = new Properties()
        resetProps.setProperty("delete.retention.ms", "3000000")
        resetProps.setProperty("file.delete.delay.ms", "4000000")
        zkUtils.changeTopicConfig(testTopic , resetProps)
    
      }
    
    
    }
    

    有一个选项删除主题。但是,它标志着删除的主题。 Zookeeper 稍后会删除该主题。由于这可能会非常长,我更喜欢retention.ms 方法

    【讨论】:

      【解决方案3】:

      从 kafka 2.3.0 版本开始,还有一种软删除 Kafka 的替代方法(不推荐使用旧方法)。

      将retention.ms 更新为1 秒(1000 毫秒),然后在一分钟后再次将其设置为默认设置,即7 天(168 小时,604,800,000 毫秒)

      软删除:- (rentention.ms=1000)(使用 kafka-configs.sh)

      bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=1000
      Completed Updating config for entity: topic 'kafka_topic3p3r'.
      

      设置为默认值:- 7 天(168 小时,retention.ms= 604800000)

      bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=604800000
      

      【讨论】:

        【解决方案4】:

        我使用这个脚本:

        #!/bin/bash
        topics=`kafka-topics --list --zookeeper zookeeper:2181`
        for t in $topics; do 
            for p in retention.ms retention.bytes segment.ms segment.bytes; do
                kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --config ${p}=100
            done
        done
        sleep 60
        for t in $topics; do 
            for p in retention.ms retention.bytes segment.ms segment.bytes; do
                kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --delete-config ${p}
            done
        done
        

        【讨论】:

          【解决方案5】:

          以下是用于清空和删除 Kafka 主题的脚本,假设 localhost 作为 zookeeper 服务器并且 Kafka_Home 设置为安装目录:

          下面的脚本将通过将保留时间设置为 1 秒然后删除配置来清空主题:

          #!/bin/bash
          echo "Enter name of topic to empty:"
          read topicName
          /$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add-config retention.ms=1000
          sleep 5
          /$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms
          

          完全删除主题,您必须停止任何适用的 kafka 代理并从 kafka 日志目录(默认:/tmp/kafka-logs)中删除它的目录,然后运行此从 zookeeper 中删除主题的脚本。为了验证它是否已从 zookeeper 中删除, ls /brokers/topics 的输出不应再包含主题:

          #!/bin/bash
          echo "Enter name of topic to delete from zookeeper:"
          read topicName
          /$Kafka_Home/bin/zookeeper-shell localhost:2181 <<EOF
          rmr /brokers/topics/$topicName
          ls /brokers/topics
          quit
          EOF
          

          【讨论】:

          • 只有在睡眠的 5 秒内发生保留检查时,这才有效。请确保您睡觉,直到这里指定的检查确定通过:grep "log.retention.check.interval" $Kafka_Home/config/server.properties
          • 我想编辑答案,因为第一个命令中有一个小错误。但不允许编辑一个字符。实际上不是--add config,而是--add-config
          【解决方案6】:

          作为一种肮脏的解决方法,您可以调整每个主题的运行时保留设置,例如bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1retention.bytes=0 也可能有效)

          过了一会儿,kafka 应该释放空间。与重新创建主题相比,不确定这是否有任何影响。

          ps。最好在 kafka 完成清理后恢复保留设置。

          你也可以使用retention.ms来持久化历史数据

          【讨论】:

            【解决方案7】:
            1. 停止 ZooKeeper 和 Kafka
            2. 在 server.properties 中,更改 log.retention.hours 值。您可以评论log.retention.hours并添加log.retention.ms=1000。它只会在 Kafka Topic 上保留一秒钟的记录。
            3. 启动 zookeeper 和 kafka。
            4. 在消费者控制台上检查。当我第一次打开控制台时,记录就在那里。但是当我再次打开控制台时,记录被删除了。
            5. 稍后,您可以将log.retention.hours 的值设置为您想要的数字。

            【讨论】:

              【解决方案8】:

              用 kafka 0.10 测试

              1. stop zookeeper & Kafka server,
              2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name
              3. go to 'zookeeper-data' folder , delete data inside that.
              4. start zookeeper & kafka server again.
              

              注意:如果您要删除 kafka-logs 中的主题文件夹,但不是从 zookeeper-data 文件夹中删除,那么您会看到主题仍然存在。

              【讨论】:

                【解决方案9】:

                暂时不支持。看看这个JIRA issue“添加删除主题支持”。

                手动删除:

                1. 关闭集群
                2. 清理 kafka 日志目录(由 kafka config 文件中的 log.dir 属性指定)以及 zookeeper 数据
                3. 重启集群

                对于任何给定的主题,您可以做的是

                1. 停止卡夫卡
                2. 清理特定于分区的 kafka 日志,kafka 以“logDir/topic-partition”格式存储其日志文件,因此对于名为“MyTopic”的主题,分区 id 0 的日志将存储在 /tmp/kafka-logs/MyTopic-0 中,其中 @987654328 @ 由log.dir 属性指定
                3. 重启卡夫卡

                这是NOT 一个很好的推荐方法,但它应该可以工作。 在 Kafka 代理配置文件中,log.retention.hours.per.topic 属性用于定义 The number of hours to keep a log file before deleting it for some specific topic

                另外,有没有办法在消费者阅读后立即删除消息?

                来自Kafka Documentation

                Kafka 集群会在可配置的时间段内保留所有已发布的消息(无论它们是否已被使用)。例如,如果将日志保留时间设置为两天,则在消息发布后的两天内,它可以被使用,之后将被丢弃以释放空间。就数据大小而言,Kafka 的性能实际上是恒定的,因此保留大量数据不是问题。

                事实上,每个消费者保留的唯一元数据是消费者在日志中的位置,称为“偏移量”。这个偏移量由消费者控制:通常消费者在读取消息时会线性增加偏移量,但实际上位置是由消费者控制的,它可以按照自己喜欢的任何顺序消费消息。例如,消费者可以重置为较旧的偏移量以重新处理。

                为了找到在 Kafka 0.8 Simple Consumer example 中读取的起始偏移量,他们说

                Kafka 包含两个常量来提供帮助,kafka.api.OffsetRequest.EarliestTime() 在日志中找到数据的开头并从那里开始流式传输,kafka.api.OffsetRequest.LatestTime() 只会流式传输新消息。

                您还可以在此处找到用于管理消费者端偏移量的示例代码。

                    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                                 long whichTime, String clientName) {
                    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
                    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
                    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
                    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
                    OffsetResponse response = consumer.getOffsetsBefore(request);
                
                    if (response.hasError()) {
                        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
                        return 0;
                    }
                    long[] offsets = response.offsets(topic, partition);
                    return offsets[0];
                }
                

                【讨论】:

                • 我相信 JIRA 问题的正确链接是issues.apache.org/jira/browse/KAFKA-330
                • 主题仍会显示在此处,因为它已在 zookeeper 中列出。您必须递归删除 brokers/topics/&lt;topic_to_delete&gt; 下的所有内容以及日志才能摆脱它。
                • 根据问题链接可以删除0.8.1版本以后的话题。您可以通过kafka-run-class.sh kafka.admin.DeleteTopicCommand查看详细帮助。
                • 更新:从 kafka 0.8.2 开始,命令改为:kafka-run-class.sh kafka.admin.TopicCommand --delete --topic [topic_to_delete] --zookeeper localhost:2181
                • 几乎所有解决方案都说连同kafka-log删除删除as well the zookeeper data。这个地点在哪里?
                【解决方案10】:

                对于 brew 用户

                如果您像我一样使用brew 并浪费大量时间搜索臭名昭著的kafka-logs 文件夹,不要再害怕了。 (请让我知道这是否适用于您以及 Homebrew、Kafka 等的多个不同版本 :))

                您可能会在以下位置找到它:

                地点:

                /usr/local/var/lib/kafka-logs


                如何实际找到该路径

                (这对您通过 brew 安装的每个应用程序都有帮助)

                1) brew services list

                kafka 开始 matbhz /Users/matbhz/Library/LaunchAgents/homebrew.mxcl.kafka.plist

                2) 打开并阅读您在上面找到的plist

                3)找到定义server.properties位置的行打开它,在我的例子中:

                • /usr/local/etc/kafka/server.properties

                4) 查找log.dirs 行:

                log.dirs=/usr/local/var/lib/kafka-logs

                5) 转到该位置并删除所需主题的日志

                6) 使用brew services restart kafka重启Kafka

                【讨论】:

                  【解决方案11】:

                  在从 kafka 集群中手动删除主题时,您可以查看 https://github.com/darrenfu/bigdata/issues/6 大多数解决方案中遗漏的一个重要步骤是删除 ZK 中的/config/topics/&lt;topic_name&gt;

                  【讨论】:

                    【解决方案12】:

                    正如我在这里提到的Purge Kafka Queue

                    在 Kafka 0.8.2 中测试,快速入门示例:首先,在 config 文件夹下的 server.properties 文件中添加一行:

                    delete.topic.enable=true
                    

                    然后,您可以运行以下命令:

                    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
                    

                    【讨论】:

                    • 顺便说一句,添加选项后不需要重启Kafka服务器,以防有人疑惑。
                    【解决方案13】:

                    关于主题及其分区的所有数据都存储在tmp/kafka-logs/ 中。而且它们是以topic-partionNumber的格式存储的,所以如果你想删除一个主题newTopic,你可以:

                    • 停止卡夫卡
                    • 删除文件rm -rf /tmp/kafka-logs/newTopic-*

                    【讨论】:

                      【解决方案14】:

                      我们几乎尝试了其他答案所描述的内容,并取得了中等程度的成功。 真正对我们有用的是类命令(Apache Kafka 0.8.1)

                      sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost:2181

                      【讨论】:

                      • 在 0.8.1 中试过这个。命令返回“删除成功!”但是它不会删除日志文件夹中的分区。
                      • 在 0.8.2.1 (homebrew) 上试过,它给出了这个错误。 Error: Could not find or load main class kafka.admin.DeleteTopicCommand
                      • 从新的 kafka (0.8.2) 开始,它是 sh kafka-run-class.sh kafka.admin.TopicCommand --delete --topic [topic_for_delete] --zookeeper localhost:2181 。确保 delete.topic.enable 为 true。
                      猜你喜欢
                      • 1970-01-01
                      • 1970-01-01
                      • 1970-01-01
                      • 2010-10-10
                      • 2020-08-12
                      • 2019-06-23
                      • 1970-01-01
                      • 2022-11-21
                      • 1970-01-01
                      相关资源
                      最近更新 更多