【问题标题】:Spark streaming checkpoint火花流检查点
【发布时间】:2018-08-26 16:21:49
【问题描述】:

我正在使用 Spark Kafka 直接流式传输从 Kafka 读取消息。我想实现零消息丢失,重新启动 spark 后,它必须从 Kafka 读取丢失的消息。我正在使用检查点来保存所有读取偏移量,以便下次 spark 将从存储的偏移量开始读取。这是我的理解。

我使用了下面的代码。我停止了我的火花,向卡夫卡推送了一些信息。重新启动未从 Kafka 读取丢失消息的火花后。 Spark 读取来自 kafka 的最新消息。如何读取来自 Kafka 的遗漏消息?

val ssc = new StreamingContext(spark.sparkContext, Milliseconds(6000))
ssc.checkpoint("C:/cp")

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val ssc = new StreamingContext(spark.sparkContext, Milliseconds(50))
val msgStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

注意:应用程序日志显示 auto.offset.reset 为 none 而不是 latest。为什么?

WARN KafkaUtils: overriding auto.offset.reset to none for executor

SBT

scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
val kafka_stream_version = "1.6.3"

窗口:7

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-streaming scala-streams


    【解决方案1】:

    如果您想阅读错过的消息,请尝试提交过程而不是检查点。

    请理解,Spark 无法读取带有属性的旧消息:

    "auto.offset.reset" -> "latest"
    

    试试这个:

    val kafkaParams = Map[String, Object](
     //...
     "auto.offset.reset" -> "earliest",
     "enable.auto.commit" -> (false: java.lang.Boolean)
     //...
    )
    
    stream.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
      //Your processing goes here
    
      //Then commit after completing your process.
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    }
    

    希望这会有所帮助。

    【讨论】:

      【解决方案2】:

      我宁愿建议不要依赖检查点,而是可以使用外部数据存储来保存已处理的 Kafka 消息偏移量。请点击链接以获得一些见解。 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/

      【讨论】:

        猜你喜欢
        • 2016-09-23
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-01-14
        • 2017-12-16
        相关资源
        最近更新 更多