【发布时间】:2019-06-08 02:59:43
【问题描述】:
如果我使用 KafkaUtils.createDirectStream 读取消息,如何在 Kafka 中存储消息偏移量。 每次应用程序宕机时,Kafka 都会丢失偏移值。然后它正在读取 auto.offset.reset 中提供的值(这是最新的),并且在应用程序的停止-启动间隔内无法读取消息。
【问题讨论】:
标签: apache-spark apache-kafka spark-streaming-kafka
如果我使用 KafkaUtils.createDirectStream 读取消息,如何在 Kafka 中存储消息偏移量。 每次应用程序宕机时,Kafka 都会丢失偏移值。然后它正在读取 auto.offset.reset 中提供的值(这是最新的),并且在应用程序的停止-启动间隔内无法读取消息。
【问题讨论】:
标签: apache-spark apache-kafka spark-streaming-kafka
您可以通过手动提交偏移量来避免这种情况。设置enable.auto.commit为false,操作成功后使用下面的代码提交kafka中的offset。
var offsetRanges = Array[OffsetRange]()
val valueStream = stream.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map(_.value())
//operation
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
你也可以阅读这个文档,它会让你更好地理解偏移管理https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/
【讨论】: