【发布时间】:2018-05-17 15:03:09
【问题描述】:
我在question 上看到,我们可以使用org.apache.spark.streaming.kafka.KafkaUtils#createRDD 在 spark 批处理作业中从 Kafka 读取消息,但是这种方法需要一个偏移范围,需要一个“从偏移量”和“直到偏移量”。我从org.apache.spark.streaming.kafka.KafkaCluster#getLatestLeaderOffsets 方法中获得了“从偏移量”,但我怎样才能获得直到偏移量?我正在使用kafka-2.1.1-0.9.0.1
【问题讨论】:
-
理想情况下,最新的偏移量应该是“直到偏移量”,否则,没有什么可以去“直到”
-
无论如何,如果您对每个批次使用相同的消费者组,那么您的偏移量将由 Kafka 内部维护。您只需将起始偏移量设置为最早的偏移量,然后选择要使用多少条消息,以及何时提交偏移量。注意:我说的是常规的 Kafka API,而不是 Spark
-
最新的偏移量是下一条要读取的消息的偏移量。它返回存储在 Zookeeper 中的相同偏移量。这就是为什么我一开始就使用它的原因。最早的偏移量返回 0。在我的处理结束时,我将直到偏移量保存在 kafka(和 Zookeeper)中。
-
选择要读取的消息数量的问题是我们可以通过不知道kafka中有多少消息来获得超出范围的偏移量。有时我们阅读的信息比我们想要的要少。
-
你是对的。最新的偏移量应该用于“直到偏移量”。我使用
org.apache.spark.streaming.kafka.KafkaCluster#getConsumerOffsets传递消费者组 ID 的方法获取“来自偏移量”。感谢您的帮助。
标签: scala apache-spark apache-kafka kafka-consumer-api