【问题标题】:Akka-streams: how to get flow names in metrics reported by kamon-akkaAkka-streams:如何在 kamon-akka 报告的指标中获取流名称
【发布时间】:2017-04-09 02:54:24
【问题描述】:

我一直在尝试为 Akka 流设置一些工具。得到它的工作,但是,即使我命名了属于流的所有流,我仍然在指标中得到这种名称:flow-0-0-unknown-operation

我正在尝试做的一个简单示例:

val myflow = Flow[String].named("myflow").map(println)

Source.via(myflow).to(Sink.ignore).run()

我基本上想查看为“myflow”创建的具有正确名称的 Actor 的指标。

这甚至可能吗?我错过了什么吗?

【问题讨论】:

标签: scala akka-stream akka-monitoring kamon


【解决方案1】:

我在我的项目中遇到了这个挑战,我通过使用 Kamon + Prometheus 解决了这个问题。但是我必须创建一个 Akka Stream Flow,我可以设置它的名称 metricName 并使用 val kamonThroughputGauge: Metric.Gauge 从中导出度量值。

class MonitorProcessingTimerFlow[T](interval: FiniteDuration)(metricName: String = "monitorFlow") extends GraphStage[FlowShape[T, T]] {
  val in = Inlet[T]("MonitorProcessingTimerFlow.in")
  val out = Outlet[T]("MonitorProcessingTimerFlow.out")

  Kamon.init()
  val kamonThroughputGauge: Metric.Gauge = Kamon.gauge("akka-stream-throughput-monitor")
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
    // mutable state
    var open = false
    var count = 0
    var start = System.nanoTime
    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        try {
          push(out, grab(in))
          count += 1
          if (!open) {
            open = true
            scheduleOnce(None, interval)
          }
        } catch {
          case e: Throwable => failStage(e)
        }
      }
    })
    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        pull(in)
      }
    })

    override protected def onTimer(timerKey: Any): Unit = {
      open = false
      val duration = (System.nanoTime - start) / 1e9d
      val throughput = count / duration
      kamonThroughputGauge.withTag("name", metricName).update(throughput)
      count = 0
      start = System.nanoTime
    }
  }
  override def shape: FlowShape[T, T] = FlowShape[T, T](in, out)
}

然后我创建了一个使用MonitorProcessingTimerFlow 导出指标的简单流:

implicit val system = ActorSystem("FirstStreamMonitoring")
val source = Source(Stream.from(1)).throttle(1, 1 second)
/** Simulating workload fluctuation: A Flow that expand the event to a random number of multiple events */
val flow = Flow[Int].extrapolate { element =>
  Stream.continually(Random.nextInt(100)).take(Random.nextInt(100)).iterator
}
val monitorFlow = Flow.fromGraph(new MonitorProcessingTimerFlow[Int](5 seconds)("monitorFlow"))
val sink = Sink.foreach[Int](println)

val graph = source
  .via(flow)
  .via(monitorFlow)
  .to(sink)
graph.run()

application.conf进行正确配置:

kamon.instrumentation.akka.filters {
  actors.track {
    includes = [ "FirstStreamMonitoring/user/*" ]
  }
}

我可以在名称为 name="monitorFlow" 的 prometheus 控制台上看到吞吐量指标:

【讨论】:

    猜你喜欢
    • 2015-07-31
    • 2016-10-20
    • 2019-04-13
    • 1970-01-01
    • 2022-01-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-08
    相关资源
    最近更新 更多