【问题标题】:How do I delete a Kafka Consumer Group to reset offsets?如何删除 Kafka 消费者组以重置偏移量?
【发布时间】:2017-07-22 17:17:17
【问题描述】:

我想删除一个 Kakfa 消费者组,这样当应用程序创建消费者并订阅一个主题时,它可以从主题数据的开头开始。

这是使用当前最新的 Confluent Platform 3.1.2 的单节点开发虚拟机,该平台使用 Kafka 0.10.1.1。

我尝试了正常的语法:

sudo /usr/bin/kafka-consumer-groups --new-consumer --bootstrap-server localhost:9092 --delete --group my_consumer_group

我得到错误:

Option [delete] is only valid with [zookeeper]. Note that there's no need to delete group metadata for the new consumer as the group is deleted when the last committed offset for that group expires.

如果我尝试 zookeeper 变体:

sudo /usr/bin/kafka-consumer-groups --zookeeper localhost:2181 --delete --group my_consumer_group

我明白了:

Delete for group my_consumer_group failed because group does not exist.

如果我使用“旧”消费者列出,我看不到我的消费者组(或任何其他消费者组)

sudo /usr/bin/kafka-consumer-groups --zookeeper localhost:2181 --list

如果我使用“新”消费者列出,我可以看到我的消费者组,但显然我无法删除它:

sudo /usr/bin/kafka-consumer-groups --new-consumer --bootstrap-server localhost:9092 --list

【问题讨论】:

  • 这个开箱即用没有解决方案
  • shell 只能删除基于 ZK 的消费者组。对于新消费者组,您不需要删除组,因为 Kafka 会自动删除过期组。如果您确实要重置此类组的偏移量,请将offsets.retention.minutes 设置为较小的值。
  • seekToBeginning 仅在 Java API 中提供。它绝对不在 Python API 中。即使在 Java API 中,它也不太理想。 Kafka 0.10.2 以更简洁的方式解决了我的底层场景。

标签: apache-kafka apache-zookeeper


【解决方案1】:

这可以通过 Kafka 1.1.x 完成。来自文档:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

【讨论】:

    【解决方案2】:

    在 Kafka 0.11(或 Confluent 3.3)中,您可以重置任何现有消费者组的偏移量,而无需删除主题。事实上,您也可以将偏移更改为任何绝对偏移值或时间戳或任何相对位置。

    这些新功能都是通过kafka-consumer-groups 命令行工具上的新--reset-offsets 标志添加的。

    在此处查看 KIP-122 详细信息https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling

    【讨论】:

      【解决方案3】:

      如果使用Java客户端,可以先获取起始偏移量。

      TopicPartition partition = new TopicPartition("YOUR_TOPIC", YOUR_PARTITION);
      Map<TopicPartition, Long> map = consumer.beginningOffsets(Collections.singleton(partition));
      

      以及消费者用来开始处理的偏移量,(如果不删除消费者组)。

      Long committedOffset = consumer.committed(partition).offset();
      

      现在,如果您认为从committedOffset 开始可以,只需轮询记录。 如果你想要开始偏移, consumer.seek(partition, map.get(partition));

      【讨论】:

        【解决方案4】:

        使用 Kafka 0.10.2 升级到刚刚发布的 Confluent Platform 3.2 解决了我的基本问题。当我删除主题时,偏移信息现在已正确重置。所以当我创建同名主题时,消费者从新数据的开头开始。

        我仍然无法使用kafka-consumer-groups 工具删除新样式消费者组,但我的根本问题已解决。

        在 Kafka 0.10.2 之前,有黑客攻击,但没有解决这个问题的干净方法。

        【讨论】:

        • 在我删除并重新创建主题后,我仍然可以看到大量“汇总最近的偏移量” - 这些是否以某种方式被清除了?
        【解决方案5】:

        您也可以在不删除整个消费者组的情况下重置单个主题的偏移量:

        bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --topic my-topic --reset-offsets --to-earliest --execute
        

        【讨论】:

          猜你喜欢
          • 2018-02-03
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2019-04-04
          • 1970-01-01
          • 2019-05-01
          • 2018-10-06
          • 2018-05-03
          相关资源
          最近更新 更多