【发布时间】:2016-03-25 13:13:18
【问题描述】:
如akka streams documentation 中所述,我尝试创建一个工人池(流):
def balancer[In, Out](worker: Flow[In, Out, NotUsed], workerCount: Int): Flow[In, Out, NotUsed] = {
import GraphDSL.Implicits._
Flow.fromGraph(GraphDSL.create() { implicit b =>
val balancer = b.add(Balance[In](workerCount))
val merge = b.add(Merge[Out](workerCount))
for (_ <- 1 to workerCount) {
balancer ~> worker ~> merge
}
FlowShape(balancer.in, merge.out)
})
}
然后我使用这个函数并行运行一个流:
def main(args: Array[String]) {
val system = ActorSystem()
implicit val mat = ActorMaterializer.create(system)
val flow = Flow[Int].map(e => {
println(e)
Thread.sleep(1000) // 1 second
e
})
Source(Range.apply(1, 10).toList)
.via(balancer(flow, 3))
.runForeach(e => {})
}
我得到了预期的输出1, 2, 3, 4, 5, 6, 7, 8, 9,但数字以每秒 1 次的速度出现(无并行性)。我做错了什么?
【问题讨论】:
-
执行上下文呢?如果您使用一个固定大小的线程池,这是正常的
-
也就是说默认上下文的大小是1?您能否详细说明配置执行上下文的首选方式是什么?
-
No default context 不是固定的,你可能是在导入全局隐式上下文,这会依赖太多的东西比如版本,你可以试试
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) -
是熔断的缘故。如果禁用自动熔断,它可以工作。我还没有发布答案,因为我还找不到正确配置异步边界的方法。我还在调查中
-
我已经创建了一个问题:github.com/akka/akka/issues/20146
标签: scala akka akka-stream