【发布时间】:2020-12-08 14:25:50
【问题描述】:
我是 spark 新手,正在阅读一些有关监控 spark 应用程序的内容。基本上,我想知道在给定的触发时间和查询进度中,spark 应用程序处理了多少条记录。我知道 'lastProgress' 提供了所有这些指标,但是当我将 awaitTermination 与 'lastProgress' 一起使用时,它总是返回 null。
val q4s = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
.writeStream
.outputMode("append")
.option("checkpointLocation", checkpoint_loc)
.trigger(Trigger.ProcessingTime("10 seconds"))
.format("console")
.start()
println("Query Id: "+ q4s.id.toString())
println("QUERY PROGRESS.........")
println(q4s.lastProgress);
q4s.awaitTermination();
输出:
Query Id: efd6bc15-f10c-4938-a1aa-c81fdb2b33e3
QUERY PROGRESS.........
null
如何在使用 awaitTermination 时获得查询进度,或者如何在不使用 awaitTermination 的情况下保持查询持续运行?
提前致谢。
【问题讨论】:
标签: apache-spark spark-structured-streaming