【问题标题】:Structured streaming - Metrics in Grafana结构化流 - Grafana 中的指标
【发布时间】:2018-06-01 18:57:13
【问题描述】:

我正在使用结构化流从 Kafka 读取数据并创建各种聚合指标。我已经使用metrics.properties 启用了 Graphite sink。我已经看到旧 Spark 版本中的应用程序具有流相关的指标。我没有看到与结构化流相关的流相关指标。我究竟做错了什么?

例如 - 无法找到未处理的批次或正在运行的批次或最后完成的批次总延迟。

我通过设置启用了流式指标:

SparkSession.builder().config("spark.sql.streaming.metricsEnabled",true)

即便如此,我也只获得了 3 个指标:

  • driver.spark.streaming.inputrate
  • driver.spark.streaming.latency
  • driver.spark.streaming.processingrate

这些指标之间存在差距。而且它在应用程序启动后才开始出现。如何将广泛的流相关指标获取到 grafana?

我检查了StreamingQueryProgress。我们只能使用这个以编程方式创建自定义指标。有没有办法可以使用 Spark 流式传输已经发送到我提到的接收器的指标?

【问题讨论】:

    标签: apache-spark apache-spark-sql graphite spark-structured-streaming


    【解决方案1】:

    您仍然可以找到其中一些指标。实际启动流式传输工具的查询有两种方法 - lastProgressrecentProgress

    它们会公开处理的行数、批处理的持续时间、批处理中的输入行数等详细信息。还有一个名为json 的方法可以一次性获取所有这些信息,可能用于发送到某些指标收集器。

    【讨论】:

    • 我是否在发送指标的石墨中找到它?即使将 spark.sql.streaming.metricsEnabled 设置为 true,我也找不到它。如何确保像您提到的那样发送其他指标?
    • query.sparkSession.streams.addListener(new StreamingQueryListener() { override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { logger.info("Query started: " + queryStarted.id+" for QUERY NAME " +query.name) } override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { logger.info("查询终止:" + queryTerminated.id+" for QUERY NAME" +query.name) } } logger.info("recentProgress "+ query.recentProgress) logger.info("progress"+ query.lastProgress)
    猜你喜欢
    • 2018-07-20
    • 1970-01-01
    • 2021-03-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-04-27
    • 2017-05-04
    相关资源
    最近更新 更多