【问题标题】:Is there any way to start consuming kafka topics from a specific offset in Java API?有没有办法从 Java API 中的特定偏移量开始使用 kafka 主题?
【发布时间】:2020-08-16 22:51:26
【问题描述】:

我正在使用 Kafka Stream API。当我启动我的应用程序时,有时会出现间隙,我想从特定的偏移量开始使用。最早或最晚不是我想要的。

streamProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

我正在寻找的是这样一种场景,例如我在配置文件中设置偏移量或日期(以毫秒为单位),然后从该点开始使用。我想知道有没有办法做到这一点?

【问题讨论】:

  • 你调查过差距从何而来吗? “最新”应该为您提供之前没有使用过的最后一个偏移量,所以不应该有。
  • 好点。但是发生的事情是我的应用程序关闭的时间超过了设置的主题保留时间。所以偏移量不再有效。我认为最早将从可用的最早偏移量开始(因此得名),但是当我将其设置为最早时,它将偏移量设置为 INITIAL 并引发异常
  • 如果你不局限于kafka并且可以切换到pulsar,pulsar可以做到pulsar.apache.org/docs/en/client-libraries-java/…
  • @agilob 看起来不错,但这将是一次巨大的迁移
  • 那可能检查一下有kafka兼容模式的pulsar客户端是否支持

标签: java apache-kafka kafka-consumer-api apache-kafka-streams


【解决方案1】:

配置auto.offset.reset 仅在尚未提交偏移量的情况下对应用程序的首次启动有效。如果提交了偏移量,应用程序将始终从提交的偏移量恢复处理。

在 Kafka Streams 中,没有明确设置起始偏移量的 API。消费者 API 将允许通过 Consumer#seek() 进行此操作。

对于 Kafka Streams,获得所需行为的一种方法是停止应用程序,使用 bin/kafka-consmer.group.sh(或者更好的 bin/kafka-streams-application-reset.sh),并提交所需的起始偏移量。如果您之后启动应用程序,它将获取已提交的偏移量并从那里开始处理。

【讨论】:

    猜你喜欢
    • 2018-10-25
    • 2019-09-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-07-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多