【问题标题】:Kafka 0.10.1.0 change offset timeKafka 0.10.1.0 更改偏移时间
【发布时间】:2017-02-28 09:47:18
【问题描述】:

在 2 个 logstash 实例之间使用 Kafka 集群设置 Elasticsearch 管道。 我需要将某个主题的偏移量重置回 12 小时并重新启动消费者。

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list  kfkserver:9092 --topic topicname --time 1488153601000

返回topicname:0:3730858

1488153601000

如何设置偏移时间?

【问题讨论】:

  • 不能设置偏移时间。也许这有帮助:cwiki.apache.org/confluence/display/KAFKA/… 添加一个工具来操作偏移量是 WIP。现在,您需要使用 KafkaConsumer 并构建自己的工具来操作已提交的偏移量。
  • 好的,但即使没有基于时间的偏移量。如何将偏移量移回分区?
  • 如前所述,没有用于此目的的工具。您需要在代码中手动通过KafkaConsumer#seek() 执行此操作。

标签: apache-kafka offset consumer


【解决方案1】:

如果您使用的是 0.10.x 并且没有 0.11 中添加的出色偏移管理工具,则可以使用 kafka-console-consumer.sh 更改消费者组的偏移。但这仅适用于数字偏移量,不适用于时间戳。

首先,停止使用该消费者的任何正在运行的进程。干净关机是最好的。然后,运行如下所示的命令:

bin/kafka-console-consumer.sh --bootstrap-server :9092 \
    --topic my-topic \
    --partition 1 \
    --consumer-property group.id=my-consumer-group \
    --max-messages 0 \
    --offset 12345

--max-messages 0 在这里很重要;将其设置为任何其他值(包括 1)将消耗那么多消息,然后在该主题/分区中提交当前 latest offset。这一定是控制台消费者的错误。

接下来,使用 kafka-consumer-groups.sh 检查您的工作:

./kafka-consumer-groups.sh --bootstrap-server :9092 \
    --group my-consumer-group \
    --describe

【讨论】:

    猜你喜欢
    • 2022-01-12
    • 2019-07-30
    • 1970-01-01
    • 1970-01-01
    • 2017-05-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多