【发布时间】:2018-09-24 18:37:59
【问题描述】:
我的 Spring Boot 项目有一个演示 Kafka Streams API 的应用程序。我可以使用命令
使用主题customer 中的所有消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic customer --from-beginning
Kafka Streams API 中使用 KStream 或 KTable 消费消息的类似命令是什么?我试过了
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
properties.put("auto.offset.reset", "earliest");
两者都不起作用。我确实创建了一个测试用例来使用KafkaConsumer 而不是Streams,它没有用。代码上传到Github供参考。任何帮助都会很棒。
【问题讨论】:
-
自动偏移重置仅适用于您因某种原因失去偏移位置的情况。您是否尝试过使用 seek() 或使用消费者组命令行工具来更改偏移位置?要考虑的另一件事是为什么要回到起点。如果只是重新开始,那么流重置工具或将应用程序 id 更新为新值怎么样?这里有很多选择,但你必须决定为什么要改变位置。
-
我不认为我们可以将 seek 方法与流 api 一起使用。
-
除此之外还有其他几个选项。这不是你需要的吗?你读过confluent.io/blog/… 吗?请更具体。
-
我读到了。它解释了如何重置应用程序。这不是我要找的东西。 seek 方法是 KafkaConsumer 类的一部分。我正在寻找 Kafka Streams API 中的替代方法
-
工具
bin/kafka-streams-application-reset.sh也允许从 v1.1 开始搜索。参照。 cwiki.apache.org/confluence/display/KAFKA/…
标签: apache-kafka kafka-consumer-api apache-kafka-streams