【发布时间】:2020-08-16 22:51:26
【问题描述】:
我正在使用 Kafka Stream API。当我启动我的应用程序时,有时会出现间隙,我想从特定的偏移量开始使用。最早或最晚不是我想要的。
streamProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
我正在寻找的是这样一种场景,例如我在配置文件中设置偏移量或日期(以毫秒为单位),然后从该点开始使用。我想知道有没有办法做到这一点?
【问题讨论】:
-
你调查过差距从何而来吗? “最新”应该为您提供之前没有使用过的最后一个偏移量,所以不应该有。
-
好点。但是发生的事情是我的应用程序关闭的时间超过了设置的主题保留时间。所以偏移量不再有效。我认为最早将从可用的最早偏移量开始(因此得名),但是当我将其设置为最早时,它将偏移量设置为 INITIAL 并引发异常
-
如果你不局限于kafka并且可以切换到pulsar,pulsar可以做到pulsar.apache.org/docs/en/client-libraries-java/…
-
@agilob 看起来不错,但这将是一次巨大的迁移
-
那可能检查一下有kafka兼容模式的pulsar客户端是否支持
标签: java apache-kafka kafka-consumer-api apache-kafka-streams