【问题标题】:how to manage offset read from kafka with spark structured stream如何使用 spark 结构化流管理从 kafka 读取的偏移量
【发布时间】:2019-05-15 08:35:13
【问题描述】:

我有一个 spark 结构化流式作业,需要从 kafka 主题读取数据并进行一些聚合。该作业需要每天重新启动,但是当它重新启动时,如果我设置startingOffsets="latest",我将丢失重新启动时间之间的数据。如果我设置startingOffsets="earliest",那么该作业将从主题中读取所有数据,但不会从最后一个流作业离开的位置读取。谁能帮助我如何配置以在最后一个流作业离开的位置设置偏移量?

我正在使用 Spark 2.4.0kafka 2.1.1,我尝试为写作作业设置检查点位置,但似乎 Spark 没有检查 kafka 消息的偏移量,以便它不断检查最后一个偏移量或第一个偏移量取决于startingOffsets。

这是我的 spark 从 kafka 读取的配置:

val df = spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", host)
         .option("subscribe", topic)
         .option("startingOffsets", offset)
         .option("enable.auto.commit", "false")
         .load()

例如,kafka 主题有 10 条消息,偏移量从 1 到 10,spark 刚刚处理完消息 5,然后重新启动。如何让 spark 继续从消息 5 而不是从 1 或 11 读取?

【问题讨论】:

  • 检查点旨在处理这些情况。删除 .option("startingOffsets", offset) 改为使用 checkpointLocation 与 hdfs 兼容的位置。

标签: scala spark-structured-streaming


【解决方案1】:

似乎通过一些代码,我可以获取我需要的偏移量并将其保存到一些可靠的存储中,例如 cassandra。然后当火花流开始时,我只需要读取保存的偏移量并将其填充到startingOffsets。 这是帮助我获得所需偏移量的代码

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener._

spark.streams.addListener(new StreamingQueryListener() {
         override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
            println("Query started:" + queryStarted.id)
         }

         override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
            println("Query terminated" + queryTerminated.id)
         }

         override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
            println("Query made progress")
            println("Starting offset:" + queryProgress.progress.sources(0).startOffset)
            println("Ending offset:" + queryProgress.progress.sources(0).endOffset)
            //Logic to save these offsets
            // the logic to save the offset write in here
         }
      })

【讨论】:

    猜你喜欢
    • 2019-10-03
    • 2021-03-18
    • 2018-09-28
    • 1970-01-01
    • 2019-06-28
    • 2021-05-01
    • 2017-04-04
    • 1970-01-01
    • 2019-01-29
    相关资源
    最近更新 更多