【发布时间】:2018-06-28 06:03:52
【问题描述】:
我有一个队列需要使用 akka 流图进行广播和合并。 enter image description here
我找到了图形演示和队列演示。并且不知道如何组合它们。谁能帮我吗?谢谢
这是图表演示
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder:
GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.ignore
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
ClosedShape
})
这里是队列演示
val bufferSize = 5
val elementsToProcess = 3
val queue = Source
.queue[Int](bufferSize, OverflowStrategy.backpressure)
.throttle(elementsToProcess, 3.second)
.map(x ⇒ x * x)
.toMat(Sink.foreach(x ⇒ println(s"completed $x")))(Keep.left)
.run()
val source = Source(1 to 10)
implicit val ec = system.dispatcher
source.mapAsync(1)(x ⇒ {
queue.offer(x).map {
case QueueOfferResult.Enqueued ⇒ println(s"enqueued $x")
case QueueOfferResult.Dropped ⇒ println(s"dropped $x")
case QueueOfferResult.Failure(ex) ⇒ println(s"Offer failed
${ex.getMessage}")
case QueueOfferResult.QueueClosed ⇒ println("Source Queue closed")
}
}).runWith(Sink.ignore)
我想运行一个返回队列的图表,以便我可以为其提供元素。谢谢
【问题讨论】:
标签: akka akka-stream