【问题标题】:Read from kafka in a Spark batch job (fromOffset untilOffset)在 Spark 批处理作业中从 kafka 读取(fromOffset untilOffset)
【发布时间】:2018-05-17 15:03:09
【问题描述】:

我在question 上看到,我们可以使用org.apache.spark.streaming.kafka.KafkaUtils#createRDD 在 spark 批处理作业中从 Kafka 读取消息,但是这种方法需要一个偏移范围,需要一个“从偏移量”和“直到偏移量”。我从org.apache.spark.streaming.kafka.KafkaCluster#getLatestLeaderOffsets 方法中获得了“从偏移量”,但我怎样才能获得直到偏移量?我正在使用kafka-2.1.1-0.9.0.1

【问题讨论】:

  • 理想情况下,最新的偏移量应该是“直到偏移量”,否则,没有什么可以去“直到”
  • 无论如何,如果您对每个批次使用相同的消费者组,那么您的偏移量将由 Kafka 内部维护。您只需将起始偏移量设置为最早的偏移量,然后选择要使用多少条消息,以及何时提交偏移量。注意:我说的是常规的 Kafka API,而不是 Spark
  • 最新的偏移量是下一条要读取的消息的偏移量。它返回存储在 Zookeeper 中的相同偏移量。这就是为什么我一开始就使用它的原因。最早的偏移量返回 0。在我的处理结束时,我将直到偏移量保存在 kafka(和 Zookeeper)中。
  • 选择要读取的消息数量的问题是我们可以通过不知道kafka中有多少消息来获得超出范围的偏移量。有时我们阅读的信息比我们想要的要少。
  • 你是对的。最新的偏移量应该用于“直到偏移量”。我使用 org.apache.spark.streaming.kafka.KafkaCluster#getConsumerOffsets 传递消费者组 ID 的方法获取“来自偏移量”。感谢您的帮助。

标签: scala apache-spark apache-kafka kafka-consumer-api


【解决方案1】:

您可以使用 GetOffsetShell 获取任何主题的最新偏移量

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic myTopic --time -1

这将返回:

myTopic:12341:47841

这意味着 47841 是主题 myTopic

的最新偏移量

【讨论】:

    猜你喜欢
    • 2016-10-27
    • 2021-04-22
    • 2021-10-18
    • 1970-01-01
    • 2018-10-26
    • 1970-01-01
    • 2014-02-13
    • 1970-01-01
    • 2017-08-11
    相关资源
    最近更新 更多