【问题标题】:reset previous offset for a kafka consumer重置 kafka 消费者的先前偏移量
【发布时间】:2017-05-24 21:02:47
【问题描述】:

我想重置与给定消费者对应的先前偏移量。

原因:我编写应用程序的 spring-boot 消费者代码使用“最早”作为自动偏移重置的值。由于偏移量现在已存储在 __consumer_offsets 中,因此将 auto-offset-reset 的值更改为 latest 不起作用。

注意:我使用的是高于 0.9 的 kafka 版本。不确定删除消费者是否会有所帮助,因为我知道偏移量现在存储在主题 __consumer_offsets 中。

【问题讨论】:

  • 可以调用KafkaConsumer.seek 来重置给定分区的偏移量。这就是你想要的吗?

标签: apache-kafka spring-kafka


【解决方案1】:

如果你的监听器实现了ConsumerSeekAware,你可以寻找消费者。见the documentation

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

使用回调提供:

void seek(String topic, int partition, long offset);

【讨论】:

    猜你喜欢
    • 2018-02-03
    • 1970-01-01
    • 2021-11-10
    • 1970-01-01
    • 1970-01-01
    • 2021-09-21
    • 1970-01-01
    • 1970-01-01
    • 2019-04-04
    相关资源
    最近更新 更多