【发布时间】:2020-05-30 07:56:43
【问题描述】:
根据文档[1],我一直在尝试在 Akka Stream 中并行化一个流,但由于某种原因,我没有得到预期的结果。
我按照文档中列出的步骤进行操作,我认为我没有遗漏任何内容。然而,我的流的计算都是依次发生的。
我在这里错过了什么?
[1]https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html
import akka.actor.ActorSystem
import akka.stream.FlowShape
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Source}
object ScalaParallell extends App {
implicit val system = ActorSystem("QuickStart")
def longRunningComputation(x: Int): Int = {
println(s"Computing 1 ${x}")
Thread.sleep(10000)
println(s"Computation 1 ${x} done")
x
}
def longRunningComputation2(x: Int): Int = {
println(s"Computing 2 ${x}")
Thread.sleep(10000)
println(s"Computation 2 ${x} done")
x
}
val processor: Flow[Int, Int, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// prepare graph elements
val balance = b.add(Balance[Int](2))
val merge = b.add(Merge[Int](2))
val f = Flow[Int].map(longRunningComputation)
val f2 = Flow[Int].map(longRunningComputation2)
// connect the graph
balance.out(0) ~> f.async ~> merge.in(0)
balance.out(1) ~> f2.async ~> merge.in(1)
// expose ports
FlowShape(balance.in, merge.out)
})
// Wire it all up.
val xs = List(1,2,3)
val source: Source[Int, NotUsed] = Source(xs)
source.via(processor).runForeach(println)
Thread.sleep(5000)
}
示例输出
Computing 2 1
Computation 2 1 done
Computing 2 2
1
Computation 2 2 done
Computing 2 3
2
Computation 2 3 done
3
我希望看到两个计算同时发生。例如:
Computing 1 1
Computing 1 2
Computation 1 2 done
Computing 1 3
Computation 1 1 done
Computing 2 4
1
2
..
【问题讨论】:
-
但是你的代码甚至没有调用
longRunningComputation2。 -
对,我一定是忘记放回去了。我相应地更新了输出和代码。 (不过也不起作用。)
标签: scala akka akka-stream