【发布时间】:2016-11-17 14:59:15
【问题描述】:
我创建了一个Graph,其中包含一个Balance。这个Balance 将负载分配到 5 个Flows 上。我期望会发生的事情是我的Flow 的每个实例都将在单独的Thread 上运行。然而,事实并非如此。
当我打印Thread 名称时,我注意到所有Flows 都在同一个Thread 上执行。
我使用的代码是:
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
val in = Source(1 to 10)
val out = Sink.ignore
val bal = builder.add(Balance[Int](5))
val merge = builder.add(Merge[Int](5))
val f1, f2, f3, f4, f5 = Flow[Int].map(x => {
println(Thread.currentThread())
x
}).async
in ~> bal ~> f1 ~> merge ~> out
bal ~> f2 ~> merge
bal ~> f3 ~> merge
bal ~> f4 ~> merge
bal ~> f5 ~> merge
ClosedShape
})
这个输出:
线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
我的期望是输出会是这样的:
线程[Stream_PoC-akka.actor.default-dispatcher-1,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-2,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-3,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-4,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-1,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-2,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-3,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-4,5,main]
线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
如何更改此代码示例以使Flows 并行执行?
【问题讨论】:
-
这个link 可能是相关的
-
谢谢,我已经看过这个页面很多次了。然而,他们在那里所做的并不是我想要完成的。
标签: multithreading scala akka akka-stream