【问题标题】:Purge Kafka Topic清除 Kafka 主题
【发布时间】:2013-04-23 11:00:43
【问题描述】:

有没有办法清除 kafka 中的话题?

我将一条太大的消息推送到本地机器上的 kafka 消息主题中,现在出现错误:

kafka.common.InvalidMessageSizeException: invalid message size

在这里增加fetch.size 并不理想,因为我实际上不想接受那么大的消息。

【问题讨论】:

    标签: apache-kafka purge


    【解决方案1】:

    更新:这个答案与 Kafka 0.6 相关。对于 Kafka 0.8 及更高版本,请参阅@Patrick 的回答。

    是的,停止kafka并手动删除相应子目录中的所有文件(在kafka数据目录中很容易找到)。 kafka重启后topic为空。

    【讨论】:

    • 这需要关闭 Broker,并且充其量只是一个 hack。 Steven Appleyard 的回答绝对是最好的。
    • @MaasSql 我同意。 :) 这个答案已有两年之久,大约是 0.6 版。 “更改主题”和“删除主题”功能已在稍后实现。
    • Steven Appleyard 的回答和这个回答一样老套。
    • 让应用程序以受支持的方式删除自己的数据远比关闭应用程序并删除您认为的所有数据文件然后重新打开它要简单得多。
    【解决方案2】:

    以下是我删除名为MyTopic 的主题的步骤:

    1. 描述主题,不要使用代理 ID
    2. 为列出的每个代理 ID 停止 Apache Kafka 守护程序。
    3. 连接到每个代理,并删除主题数据文件夹,例如rm -rf /tmp/kafka-logs/MyTopic-0。对其他分区和所有副本重复此操作
    4. 删除主题元数据:zkCli.sh 然后rmr /brokers/MyTopic
    5. 为每台停止的机器启动 Apache Kafka 守护进程

    如果您错过了第 3 步,则 Apache Kafka 将继续报告主题为存在(例如,当您运行 kafka-list-topic.sh 时)。

    使用 Apache Kafka 0.8.0 测试。

    【讨论】:

    • 在 0.8.1 ./zookeeper-shell.sh localhost:2181./kafka-topics.sh --list --zookeeper localhost:2181
    • 这会删除主题,而不是其中的数据。这需要停止代理。这充其量是一个hack。 Steven Appleyard 的回答绝对是最好的。
    • 这是当时唯一的方法。
    • 在 Kafka 0.8.2.1 上为我工作,虽然 zookeeper 中的 topis 位于 /brokers/topics/
    • 这可能是从 0.9 开始的问题,因为偏移量是在另一个主题中管理的,正在处理先前偏移量的消费者 - 可能会看到错误 - 不过还没有尝试过。
    【解决方案3】:

    Thomas 的建议很棒,但不幸的是,旧版本的 Zookeeper(例如 3.3.6)中的 zkCli 似乎不支持 rmr。例如比较modern Zookeeperversion 3.3 中的命令行实现。

    如果您遇到旧版本的 Zookeeper,一个解决方案是使用客户端库,例如用于 Python 的 zc.zk。对于不熟悉 Python 的人,您需要使用 pipeasy_install 安装它。然后启动一个 Python shell (python) 你可以这样做:

    import zc.zk
    zk = zc.zk.ZooKeeper('localhost:2181')
    zk.delete_recursive('brokers/MyTopic') 
    

    甚至

    zk.delete_recursive('brokers')
    

    如果您想从 Kafka 中删除所有主题。

    【讨论】:

    • 这会将数据留在经纪人身上。您需要将此解决方案与paramiko 之类的东西结合起来,以通过 SSH 连接到每个代理并清理实际的主题数据
    【解决方案4】:

    最简单的方法是将各个日志文件的日期设置为早于保留期。然后经纪人应该在几秒钟内为您清理并删除它们。这提供了几个优点:

    1. 无需关闭代理,这是一个运行时操作。
    2. 避免出现无效偏移异常的可能性(更多内容见下文)。

    根据我使用 Kafka 0.7.x 的经验,删除日志文件并重新启动代理可能会导致某些消费者出现无效的偏移异常。之所以会发生这种情况,是因为代理会在零处重新启动偏移量(在没有任何现有日志文件的情况下),并且之前从主题消费的消费者将重新连接以请求特定的 [曾经有效的] 偏移量。如果此偏移量恰好落在新主题日志的范围之外,则不会造成任何伤害,并且消费者会在开始或结束时恢复。但是,如果偏移量落在新主题日志的范围内,代理会尝试获取消息集但失败,因为偏移量与实际消息不一致。

    这也可以通过在 zookeeper 中为该主题清除消费者偏移来缓解。但是,如果您不需要原始主题并且只想删除现有内容,那么只需“触摸”一些主题日志就比停止代理、删除主题日志和清除某些 Zookeeper 节点更容易、更可靠.

    【讨论】:

    • 如何“将单个日志文件的日期设置为早于保留期”?谢谢
    【解决方案5】:

    要清除队列,您可以删除主题:

    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
    

    然后重新创建它:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 \
        --replication-factor 1 --partitions 1 --topic test
    

    【讨论】:

    • 记得在文件config/server.properties中添加行delete.topic.enable=true,因为上述命令打印的警告是Note: This will have no impact if delete.topic.enable is not set to true.
    • 这并不总是即时的。有时它只会标记为删除,实际删除将在以后发生。
    • 如果有人对此方法感兴趣,请考虑使用接受的答案。然而,也可以使用这种方法。但是,请记住,您也会丢失分配给每个代理的分区。因此,当您重新创建主题时,您可能会预计会产生一些开销,具体取决于集群的配置。另一个缺点是,如果你有活跃的消费者并且auto.create.topics.enable 设置为true,你可能会得到错误配置的主题。
    【解决方案6】:

    使用您的应用程序组清理来自特定主题的所有消息(GroupName 应与应用程序 kafka 组名称相同)。

    ./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group

    【讨论】:

    • 这种方法有问题(在0.8.1.1测试)。如果应用订阅了两个(或更多)主题:topic1 和 topic2,并且控制台消费者清理了 topic1,不幸的是它还会删除 topic2 的不相关消费者偏移量,这会导致重播来自 topic2 的所有消息。
    • 这不会清除/清理主题。另外,与kafka-consumer-groups --reset-offsets 相比,这将花费太长时间
    【解决方案7】:

    暂时将主题的保留时间更新为一秒:

    kafka-topics.sh \
      --zookeeper <zkhost>:2181 \
      --alter \
      --topic <topic name> \
      --config retention.ms=1000
    

    在较新的 Kafka 版本中,您还可以使用 kafka-configs --entity-type topics 进行操作

    kafka-configs.sh \
      --zookeeper <zkhost>:2181 \
      --entity-type topics \
      --alter \
      --entity-name <topic name> \
      --add-config retention.ms=1000
    

    然后等待清除生效(持续时间取决于主题的大小)。清除后,恢复之前的 retention.ms 值。

    【讨论】:

    • 这是一个很好的答案,但您能否添加说明如何开始检查主题的当前retention.ms 值?
    • 我不确定是否要检查当前配置,但我相信将其重置为默认值如下:bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
    • 或者取决于版本:--delete-config retention.ms
    • @GregDubicki,您可以在 .properties 文件中查看当前的日志保留情况
    • 似乎从 0.9.0 开始,使用 kafka-topics.sh 更改配置已被弃用。新选项是使用 kafka-configs.sh 脚本。 e.g. kafka-configs.sh --zookeeper &lt;zkhost&gt;:2181 --alter --entity-type topics --entity-name &lt;topic name&gt; --add-config retention.ms=1000 这也允许您检查当前的保留期,例如kafka-configs --zookeeper :2181 --describe --entity-type 主题 --entity-name
    【解决方案8】:

    在 Kafka 0.8.2 中测试,快速入门示例: 首先,在config文件夹下的server.properties文件中添加一行:

    delete.topic.enable=true
    

    然后,您可以运行以下命令:

    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
    

    然后重新创建它,让客户端继续针对空主题进行操作

    【讨论】:

      【解决方案9】:

      虽然接受的答案是正确的,但该方法已被弃用。现在应该通过kafka-configs 完成主题配置。

      kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
      

      通过该方法设置的配置可以用命令显示

      kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
      

      【讨论】:

      • 还值得补充:kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --delete-config retention.ms --entity-name MyTopic
      【解决方案10】:

      除了更新retention.ms 和retention.bytes,我注意到主题清理策略应该是“delete”(默认),如果是“compact”,它将保留更长时间的消息,即,如果它是“compact” ,您还必须指定delete.retention.ms

      ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

      还必须监视最早/最新的偏移量应该相同以确认这是否成功发生,也可以检查 du -h /tmp/kafka-logs/test-topic-3-100-*

      ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762

      ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762

      另一个问题是,你必须首先获得当前配置,所以你记得在删除成功后恢复: ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

      【讨论】:

        【解决方案11】:

        kafka 没有用于清除/清理主题(队列)的直接方法,但可以通过删除该主题并重新创建它来做到这一点。

        首先确保 sever.properties 文件有,如果没有,则添加 delete.topic.enable=true

        然后,删除主题 bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic

        然后重新创建它。

        bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
        

        【讨论】:

          【解决方案12】:

          有时,如果您的集群已饱和(分区太多,或使用加密主题数据,或使用 SSL,或控制器位于坏节点上,或连接不稳定,则需要很长时间清除所述主题。

          我遵循这些步骤,特别是如果您使用 Avro。

          1:使用 kafka 工具运行:

          kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
          

          2:运行:

          kafka-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic &lt;topic-name&gt; --new-consumer --from-beginning

          3:一旦主题为空,将主题保留设置回原始设置。

          kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
          

          希望这对某人有所帮助,因为它不容易做广告。

          【讨论】:

            【解决方案13】:

            另一种相当手动的清除主题的方法是:

            在经纪人中:

            1. 停止 kafka 代理
              sudo service kafka stop
            2. 删除所有分区日志文件(应该在所有代理上完成)
              sudo rm -R /kafka-storage/kafka-logs/&lt;some_topic_name&gt;-*

            在动物园管理员中:

            1. 运行 zookeeper 命令行界面
              sudo /usr/lib/zookeeper/bin/zkCli.sh
            2. 使用 zkCli 删除主题元数据
              rmr /brokers/topic/&lt;some_topic_name&gt;

            再次在经纪人中:

            1. 重启代理服务
              sudo service kafka start

            【讨论】:

            • 您需要停止并从每个具有副本的代理中删除文件,这意味着您在执行此操作时可能会导致客户端停机
            • 你是对的,这个只是让你真正看到Kafka存储和管理一些东西的位置。但这种蛮力方法绝对不适用于生产运行系统。
            【解决方案14】:

            来自卡夫卡 1.1

            清除主题

            bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --add-config retention.ms=100
            

            等待至少 1 分钟,以确保 kafka 清除主题 删除配置,然后进入默认值

            bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --delete-config retention.ms
            

            【讨论】:

            • 我想你有一个额外的箭头。在我的身上,我能够运行bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config rentention.ms=100
            【解决方案15】:
            ./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic
            

            这应该给retention.ms 配置。然后您可以使用上面的 alter 命令更改为 1 秒(稍后恢复为默认值)。

            Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000
            

            【讨论】:

              【解决方案16】:

              在 Java 中,使用新的 AdminZkClient 代替已弃用的 AdminUtils

                public void reset() {
                  try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
                      5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {
              
                    for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
                      deleteTopic(entry.getKey(), zkClient);
                    }
                  }
                }
              
                private void deleteTopic(String topic, KafkaZkClient zkClient) {
              
                  // skip Kafka internal topic
                  if (topic.startsWith("__")) {
                    return;
                  }
              
                  System.out.println("Resetting Topic: " + topic);
                  AdminZkClient adminZkClient = new AdminZkClient(zkClient);
                  adminZkClient.deleteTopic(topic);
              
                  // deletions are not instantaneous
                  boolean success = false;
                  int maxMs = 5_000;
                  while (maxMs > 0 && !success) {
                    try {
                      maxMs -= 100;
                      adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
                      success = true;
                    } catch (TopicExistsException ignored) {
                    }
                  }
              
                  if (!success) {
                    Assert.fail("failed to create " + topic);
                  }
                }
              
                private Map<String, List<PartitionInfo>> listTopics() {
                  Properties props = new Properties();
                  props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
                  props.put("group.id", "test-container-consumer-group");
                  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              
                  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
                  Map<String, List<PartitionInfo>> topics = consumer.listTopics();
                  consumer.close();
              
                  return topics;
                }
              

              【讨论】:

              • 你不需要 Zookeeper。使用AdminClientKafkaAdminClient
              【解决方案17】:

              按照@steven appleyard 的回答,我在 Kafka 2.2.0 上执行了以下命令,它们对我有用。

              bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe
              
              bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000
              
              bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms
              

              【讨论】:

              • 这似乎与其他答案重复
              【解决方案18】:

              这里有很多很棒的答案,但其中我没有找到关于 docker 的答案。我花了一些时间弄清楚在这种情况下使用代理容器是错误的(显然!!!)

              ## this is wrong!
              docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
              
              Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
                      at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
                      at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
                      at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
                      at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
                      at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
                      at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
                      at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
                      at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
                      at kafka.admin.TopicCommand.main(TopicCommand.scala)
              

              根据我的撰写文件,我应该使用 zookeeper:2181 而不是 --zookeeper localhost:2181

              ## this might be an option, but as per comment below not all zookeeper images can have this script included
              docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
              

              正确的命令是

              docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000
              

              希望它能节省一些人的时间。

              另外,请注意,消息不会立即被删除,它会在日志段关闭时发生。

              【讨论】:

              • 你可以执行到代理就好了。问题是localhost:2181... 例如您误解了 Docker 网络功能。另外,并不是所有的 Zookeeper 容器都有kafka-topics,所以最好不要这样使用。最新的 Kafka 安装允许 --bootstrap-servers 更改主题而不是 --zookeeper
              • 不过,执行到 Zookeeper 容器中似乎是错误的。我的观点是来自 Kafka 容器的you can use --zookeeper zookeeper:2181`。甚至从 server.properties 文件中 grep 出 Zookeeper 行
              • @cricket_007 嘿,真的谢谢你,我更正了答案,如果那里还有问题,请告诉我
              【解决方案19】:

              以下命令可用于删除 kafka 主题中的所有现有消息:

              kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json
              

              delete.json 文件的结构应该如下:

              { “分区”:[ { “主题”:“富”, “分区”:1, “偏移”:-1 } ], “版本”:1 }

              其中 offset :-1 将删除所有记录 (此命令已经用kafka 2.0.1测试过

              【讨论】:

                【解决方案20】:

                您是否考虑过让您的应用只使用一个新的重命名主题? (即与原始主题名称相似但末尾附加“1”的主题)。

                这也会为您的应用提供一个全新的主题。

                【讨论】:

                • 但这给 Kafka 管理员留下了难以处理的问题,并且所有其他使用相同主题的客户端都需要更新
                • 是的,生产者和消费者需要连接到新主题。通常主题数据会过期(根据您的保留设置)并被清除,所以我认为 Kafka 管理员不需要在这里做任何工作
                • 1) 它需要为所有客户端更改代码。在具有多个客户的企业环境中,这实际上并不可行。 2)集群有一个主题限制(尽管在几千个数量级)。绝对应该定期删除空的、废弃的主题。 3)创建一个新主题并不能真正回答问题
                【解决方案21】:

                如果您想在 Java 应用程序中以编程方式执行此操作,您可以使用 AdminClient 的 API deleteRecords。使用 AdminClient 可以删除分区和偏移级别的记录。

                根据JavaDocs,0.11.0.0 或更高版本的代理支持此操作。

                这是一个简单的例子:

                String brokers = "localhost:9092";
                String topicName = "test";
                TopicPartition topicPartition = new TopicPartition(topicName, 0);
                RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(5L);
                
                Map<TopicPartition, RecordsToDelete> topicPartitionRecordToDelete = new HashMap<>();
                topicPartitionRecordToDelete.put(topicPartition, recordsToDelete);
                
                // Create AdminClient
                final Properties properties = new Properties();
                properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
                AdminClient adminClient = AdminClient.create(properties);
                
                try {
                  adminClient.deleteRecords(topicPartitionRecordToDelete).all().get();
                } catch (InterruptedException e) {
                  e.printStackTrace();
                } catch (ExecutionException e) {
                  e.printStackTrace();
                } finally {
                  adminClient.close();
                }
                

                【讨论】:

                  【解决方案22】:

                  这里是删除主题的命令,如果您使用的是confluentinc/cp-kafka 容器。

                  docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>
                  

                  成功响应:

                  Topic <topic-name> is marked for deletion.
                  Note: This will have no impact if delete.topic.enable is not set to true.
                  

                  【讨论】:

                  • # sudo vim server.properties /kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flow
                  【解决方案23】:
                  # you have to enable this on config
                  sudo echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties 
                  sudo systemctl stop kafka 
                  sudo systemctl start kafka 
                  # purge the topic
                  /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows
                  
                  # create the topic
                  # /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test
                  # list the topic
                  # /opt/kafka/bin/kafka-console-consumer.sh  localhost:9092 --topic flows --from-beginning
                  

                  【讨论】:

                    【解决方案24】:

                    user644265 在此 answer 中建议的临时减少主题保留时间的解决方法仍然有效,但最新版本的 kafka-configs 将警告但 --zookeeper 选项已被弃用:

                    警告:--zookeeper 已弃用,将在未来版本的 Kafka 中删除

                    请改用--bootstrap-server;例如

                    kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --add-config retention.ms=100
                    

                    kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --delete-config retention.ms
                    

                    【讨论】:

                      猜你喜欢
                      • 2016-12-15
                      • 2021-12-10
                      • 2019-06-20
                      • 1970-01-01
                      • 1970-01-01
                      • 1970-01-01
                      • 2017-10-17
                      • 2014-08-08
                      • 2019-07-07
                      相关资源
                      最近更新 更多