【问题标题】:Is there a way to read from specific offset in a Kafka stream from a Spark streaming job?有没有办法从 Spark 流作业中读取 Kafka 流中的特定偏移量?
【发布时间】:2019-09-13 15:34:20
【问题描述】:

我正在尝试使用以下方法将我的 Spark 流式传输作业的偏移量提交到 Kafka:

OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

            // some time later, after outputs have completed
              ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

正如我从这个问题中得到的:

Spark DStream from Kafka always starts at beginning

这很好用,正在提交偏移量。但是,问题在于这是异步的,这意味着即使在下线又发送了两个偏移量提交之后,Kafka 仍可能保留之前的两个偏移量提交。如果此时消费者崩溃,我将其恢复,它会开始读取已处理的消息。

现在,来自其他来源,例如此处的 cmets 部分:

https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o

我知道没有办法从 Spark 流式作业同步提交偏移量,(尽管如果我使用 Kafka 流式传输有一个)。人们宁愿建议将偏移量保留在您将计算的最终结果保存在流上的数据库中。

现在,我的问题是: 如果我确实将当前读取的偏移量存储在我的数据库中,那么下次我如何从该偏移量开始读取流?

【问题讨论】:

    标签: java apache-kafka spark-streaming


    【解决方案1】:

    我研究并找到了我的问题的答案,因此我将其发布在这里,以供可能面临同样问题的其他人使用:

    • 制作一个 Map 对象,以 org.apache.kafka.common.TopicPartition 作为键,Long 作为值。 TopicPartition 构造函数有两个参数,主题名称和您将从中读取的分区。 Map 对象的值是您要从中读取流的偏移量的长表示形式。

      映射起始偏移量 = 新的 HashMap(); 起始偏移量.put(new TopicPartition("topic_name", 0), 3332980L);

    • 将流内容读入适当的 JavaInputStream,并将之前创建的 Map 对象作为参数提供给 ConsumerStrategies.Subscribe() 方法。

      最终 JavaInputDStream> 流 = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaParams, startingOffset));

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-02-15
      • 2020-08-16
      • 1970-01-01
      • 2018-10-25
      • 1970-01-01
      • 2020-02-18
      • 2018-09-28
      • 2021-02-05
      相关资源
      最近更新 更多