【问题标题】:Store Message Offset in Kafka using KafkaUtils.createDirectStream使用 KafkaUtils.createDirectStream 在 Kafka 中存储消息偏移量
【发布时间】:2019-06-08 02:59:43
【问题描述】:

如果我使用 KafkaUtils.createDirectStream 读取消息,如何在 Kafka 中存储消息偏移量。 每次应用程序宕机时,Kafka 都会丢失偏移值。然后它正在读取 auto.offset.reset 中提供的值(这是最新的),并且在应用程序的停止-启动间隔内无法读取消息。

【问题讨论】:

    标签: apache-spark apache-kafka spark-streaming-kafka


    【解决方案1】:

    您可以通过手动提交偏移量来避免这种情况。设置enable.auto.commit为false,操作成功后使用下面的代码提交kafka中的offset。

      var offsetRanges = Array[OffsetRange]()
    
              val valueStream = stream.transform {
                rdd =>
                  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
                  rdd
              }.map(_.value())
    //operation
            stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    

    你也可以阅读这个文档,它会让你更好地理解偏移管理https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/

    【讨论】:

    • 感谢您的回复 Rishi。我在我的代码中实现了上述内容,但我得到了原因:java.io.NotSerializableException:org.apache.spark.streaming.kafka010.DirectKafkaInputDStream 的对象可能作为 RDD 操作关闭的一部分被序列化。这是因为 DStream 对象是从闭包中引用的。请重写此 DStream 中的 RDD 操作以避免这种情况。已强制执行此操作,以避免 Spark 任务因不必要的对象而膨胀。
    • 如果我使用 KafkaUtils.createDirectStream 读取消息,我可以使用 CommitSync 吗
    猜你喜欢
    • 2020-08-28
    • 2014-09-22
    • 1970-01-01
    • 2017-04-29
    • 2021-02-14
    • 2016-03-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多