【问题标题】:Structured Streaming Kafka Source Offset Storage结构化流式 Kafka 源偏移存储
【发布时间】:2017-09-25 12:00:32
【问题描述】:

我正在为 Kafka (Integration guide) 使用结构化流式传输源,如上所述,它不会提交任何偏移量。

我的目标之一是监控它(检查它是否落后等)。即使它不提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们。根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是:

它们存储在哪里? 如果没有提交偏移量,是否有任何方法可以监控火花流(结构化)的 kafka 消耗(从程序外部;所以 kafka cli 或类似的,每条记录附带的偏移量不适合用例) ?

干杯

【问题讨论】:

    标签: apache-spark apache-kafka spark-streaming offset spark-structured-streaming


    【解决方案1】:

    用于 kafka 的结构化流将偏移量保存到结构下方的 HDFS。

    checkpointLocation 设置示例如下。

    .writeStream.
    .....
      option("checkpointLocation", "/tmp/checkPoint")
    .....
    

    在这种情况下,kafka 的结构化流式传输保存在路径下方

    /tmp/checkPoint/offsets/$'batchid'
    

    保存的文件包含以下格式。

    v1
    {"batchWatermarkMs":0,"batchTimestampMs":$'timestamp',"conf":{"spark.sql.shuffle.partitions":"200"}}
    {"Topic1WithPartiton1":{"0":$'OffsetforTopic1ForPartition0'},"Topic2WithPartiton2":{"1":$'OffsetforTopic2ForPartition1',"0":$'OffsetforTopic2ForPartition1'}}
    

    例如。

    v1
    {"batchWatermarkMs":0,"batchTimestampMs":1505718000115,"conf":{"spark.sql.shuffle.partitions":"200"}}
    {"Topic1WithPartiton1":{"0":21482917},"Topic2WithPartiton2":{"1":103557997,"0":103547910}}
    

    所以,我认为监控偏移滞后,需要开发具有以下功能的自定义工具。

    • 从 HDFS 的偏移量中读取。
    • 将偏移量写入 Kafka __offset 主题。

    这样,已经存在的偏移滞后监控工具可以监控结构化流中 kafka 的偏移滞后。

    【讨论】:

      【解决方案2】:

      方法一: 如果您配置了checkpointLocation(HDFS/S3 等),请转到路径,您将找到两个目录offsetscommits。偏移量保存当前偏移量,而提交具有最后提交的偏移量。您可以导航到提交目录并打开最新修改的文​​件,您可以在其中找到最后提交的偏移量。而 offsets 目录中的最新文件包含消耗的偏移量信息。

      方法二: 您还可以使用以下配置进行监控:

      class CustomStreamingQueryListener extends StreamingQueryListener with AppLogging {
      
        override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
          logDebug(s"Started query with id : ${event.id}," +
            s" name: ${event.name},runId : ${event.runId}")
        }
      
        override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
          val progress = event.progress
          logDebug(s"Streaming query made progress: ${progress.prettyJson}")
        }
      
        override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
          logDebug(s"Stream exited due to exception : ${event.exception},id : ${event.id}, " +
            s"runId: ${event.runId}")
        }
      
      }
      

      并将其添加到您的流配置中。

      spark.streams.addListener(new CustomStreamingQueryListener())
      

      【讨论】:

        【解决方案3】:

        注意事项:

        监控:现成的监控可以在 Spark 作业的 Streaming 选项卡中找到。您可以查看当前正在处理的批次是什么,以及有多少批次在排队以检查延迟。

        检查主题的最大和最小偏移量:你有 cli 来检查这些。可以从存在 kafka 代理的服务器上使用:

        kafka-run-class \
        kafka.tools.GetOffsetShell \
        --broker-list your_broker1:port,your_broker2:port,your_broker3:port \
        --topic your_topic \
        --time -2
        

        如果您与 Grafana 集成,可以获得更详细的信息

        【讨论】:

        • 感谢您的评论。但是: - 用于监控 在已经对 Spark Structured Streaming 进行的监控中,没有像 Spark Streaming 中那样带有队列的“批处理”或“记录”之类的东西。或者至少我在 UI 中看不到它。
        • 感谢您的评论。但是: - 对于 kafka 工具,问题在于该工具不支持 SSL。它有一个 PR,但还没有合并(或发布)。另外,这会给我主题的偏移量,但不会给我的消费者滞后多少,因为这是 kafka 中的另一个工具,但问题是 Spark Structured Streaming 不会提交偏移量(选项'auto.commit.offset ' 不支持)因此偏移量仅在我无法找到的某些文件夹中使用 spark 处理。
        • 是的,你是对的,火花不会保存偏移量。我们只是将它们保存到 mysql 数据库中。或者您可以写入文件。但是,这不在结构化流中。您至少应该能够在程序运行时获得偏移量,然后保存它。
        • 是的,确实如此。我想知道我是否可以避免这种情况。根据文档,Sparks 使用 WAL 将偏移量保存到 HDFS(我猜只有启用了检查点),但我没能看到在哪里。谢谢!
        • 如果我们使用 dstreams,则不使用 WAL。它仅适用于旧实现
        猜你喜欢
        • 2019-10-03
        • 2018-09-28
        • 1970-01-01
        • 2019-03-11
        • 1970-01-01
        • 1970-01-01
        • 2021-09-11
        • 1970-01-01
        • 2021-08-23
        相关资源
        最近更新 更多