【问题标题】:How to combine akka stream source queue with graph?如何将akka流源队列与图结合起来?
【发布时间】:2018-06-28 06:03:52
【问题描述】:

我有一个队列需要使用 akka 流图进行广播和合并。 enter image description here

我找到了图形演示和队列演示。并且不知道如何组合它们。谁能帮我吗?谢谢

这是图表演示

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: 
GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(1 to 10)
  val out = Sink.ignore

  val bcast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))

  val f1, f2, f3, f4 = Flow[Int].map(_ + 10)

  in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
  bcast ~> f4 ~> merge
  ClosedShape
})

这里是队列演示

val bufferSize = 5
val elementsToProcess = 3

val queue = Source
  .queue[Int](bufferSize, OverflowStrategy.backpressure)
  .throttle(elementsToProcess, 3.second)
  .map(x ⇒ x * x)
  .toMat(Sink.foreach(x ⇒ println(s"completed $x")))(Keep.left)
  .run()

val source = Source(1 to 10)

implicit val ec = system.dispatcher
source.mapAsync(1)(x ⇒ {
  queue.offer(x).map {
    case QueueOfferResult.Enqueued    ⇒ println(s"enqueued $x")
    case QueueOfferResult.Dropped     ⇒ println(s"dropped $x")
    case QueueOfferResult.Failure(ex) ⇒ println(s"Offer failed 
${ex.getMessage}")
    case QueueOfferResult.QueueClosed ⇒ println("Source Queue closed")
  }
}).runWith(Sink.ignore)

我想运行一个返回队列的图表,以便我可以为其提供元素。谢谢

【问题讨论】:

    标签: akka akka-stream


    【解决方案1】:

    您的val queue 是“队列”(通过toMat 组合器成为RunnableGraph)运行的结果。您的g 也是RunnableGraph(您可以调用它)。 为这样的图提供一个元素意味着定义一个Source,它将元素传递到下游。您可以组合的是构成这样一个可运行图的不同组件。它需要一个Source 和一个Sink,并且在两者之间可能有任意数量的Flow 组件。我建议您通过 the official documentation for akka streams 了解它们的一般工作原理,并特别查看自定义图表部分。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-09-02
      • 2020-01-23
      • 1970-01-01
      • 2021-11-12
      • 1970-01-01
      • 1970-01-01
      • 2021-08-23
      相关资源
      最近更新 更多