【问题标题】:Akka Stream Graph parallelisationAkka 流图并行化
【发布时间】:2016-11-17 14:59:15
【问题描述】:

我创建了一个Graph,其中包含一个Balance。这个Balance 将负载分配到 5 个Flows 上。我期望会发生的事情是我的Flow 的每个实例都将在单独的Thread 上运行。然而,事实并非如此。 当我打印Thread 名称时,我注意到所有Flows 都在同一个Thread 上执行。

我使用的代码是:

RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
val in = Source(1 to 10)
  val out = Sink.ignore

  val bal = builder.add(Balance[Int](5))
  val merge = builder.add(Merge[Int](5))

  val f1, f2, f3, f4, f5 = Flow[Int].map(x => {
    println(Thread.currentThread())
    x
  }).async

  in ~> bal ~> f1 ~> merge ~> out
  bal ~> f2 ~> merge
  bal ~> f3 ~> merge
  bal ~> f4 ~> merge
  bal ~> f5 ~> merge

  ClosedShape
})

这个输出:

线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

我的期望是输出会是这样的:

线程[Stream_PoC-akka.actor.default-dispatcher-1,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-2,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-3,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-4,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-1,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-2,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-3,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-4,5,main]

线程[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

如何更改此代码示例以使Flows 并行执行?

【问题讨论】:

  • 这个link 可能是相关的
  • 谢谢,我已经看过这个页面很多次了。然而,他们在那里所做的并不是我想要完成的。

标签: multithreading scala akka akka-stream


【解决方案1】:

异步指令不保证您的阶段将在单独的线程中执行。只要这些阶段在时间上不重叠,它们就可能在同一个线程上运行。

对于您的具体情况,执行的步骤可能如下:

  • merge 请求第一个入口的元素
  • 平衡通过第一个流程为元素提供服务
  • 合并请求第二个入口的元素
  • 平衡通过第二个流程服务元素

现在,如果您按以下方式更改余额

val bal = builder.add(Balance[Int](5, waitForAllDownstreams = true))

您将按照步骤强制生成 5 个线程

  • 合并请求第一个入口的元素
  • 合并请求第二个入口的元素
  • 合并请求第三个入口的元素
  • 合并请求第 4 个入口的元素
  • merge 请求第 5 个入口的元素
  • balance 开始为所有流中的元素提供服务

【讨论】:

  • 这听起来完全符合我的需要。有机会我会试试这个。谢谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-11-16
  • 1970-01-01
  • 2020-12-06
相关资源
最近更新 更多