【问题标题】:How to get progress of streaming query after awaitTermination?awaitTermination 后如何获取流式查询的进度?
【发布时间】:2020-12-08 14:25:50
【问题描述】:

我是 spark 新手,正在阅读一些有关监控 spark 应用程序的内容。基本上,我想知道在给定的触发时间和查询进度中,spark 应用程序处理了多少条记录。我知道 'lastProgress' 提供了所有这些指标,但是当我将 awaitTermination 与 'lastProgress' 一起使用时,它总是返回 null。

 val q4s = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", topic)
  .option("startingOffsets", "earliest")
  .load()
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", checkpoint_loc)
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .format("console")
  .start()

  println("Query Id: "+ q4s.id.toString())
  println("QUERY PROGRESS.........")
println(q4s.lastProgress);
q4s.awaitTermination();

输出:

Query Id: efd6bc15-f10c-4938-a1aa-c81fdb2b33e3
QUERY PROGRESS.........
null

如何在使用 awaitTermination 时获得查询进度,或者如何在不使用 awaitTermination 的情况下保持查询持续运行?

提前致谢。

【问题讨论】:

    标签: apache-spark spark-structured-streaming


    【解决方案1】:

    使用专用的可运行线程

    您可以创建一个专用线程连续打印您的流式查询的最后进度。

    首先,定义一个可运行的监控类,它每 10 秒(10000 毫秒)打印出最后一个进度:

    class StreamingMonitor(q: StreamingQuery) extends Runnable {
      def run {
        while(true) {
          println("Time: " + Calendar.getInstance().getTime())
          println(q.lastProgress)
          Thread.sleep(10000)
        }
      }
    }
    

    其次,将其实现到您的应用程序代码中,如下所示:

    val q4s: StreamingQuery = df.writeStream
      [...]
      .start()
    
    new Thread(new StreamingMonitor(q4s)).start()
    
    q4s.awaitTermination()
    

    循环查询状态

    您还可以对查询的状态使用 while 循环:

    val q4s: StreamingQuery = df.writeStream
      [...]
      .start()
    
    while(q4s.isActive) {
      println(q4s.lastProgress)
      Thread.sleep(10000)
    }
    
    q4s.awaitTermination()
    

    使用 StreamingQueryListener 的替代解决方案

    监控流式查询的另一种解决方案是使用StreamingQueryListener。同样,首先定义一个扩展 StreamingQueryListener 的类:

    import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
    import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
    
    
    class MonitorListener extends StreamingQueryListener {
    
      override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { }
    
      override def onQueryProgress(event: QueryProgressEvent): Unit = {
        println(s"""numInputRows: ${event.progress.numInputRows}""")
        println(s"""processedRowsPerSecond: ${event.progress.processedRowsPerSecond}""")
      }
    
      override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { }
    }
    

    然后用你的 SparkSession 注册它:

    spark.streams.addListener(new MonitorListener)
    

    【讨论】:

      【解决方案2】:

      您必须使用对流查询的引用启动一个单独的线程来监控(比如q4s)并定期拉取进度。

      启动查询的线程(Spark Structured Streaming 应用程序的主线程)通常是awaitTermination,因此它启动的流式查询的守护线程可以继续运行。

      【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-03-05
      • 1970-01-01
      相关资源
      最近更新 更多