【问题标题】:Akka Stream ParallelismAkka 流并行
【发布时间】:2020-05-30 07:56:43
【问题描述】:

根据文档[1],我一直在尝试在 Akka Stream 中并行化一个流,但由于某种原因,我没有得到预期的结果。

我按照文档中列出的步骤进行操作,我认为我没有遗漏任何内容。然而,我的流的计算都是依次发生的。

我在这里错过了什么?

[1]https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html

import akka.actor.ActorSystem
import akka.stream.FlowShape
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Source}

object ScalaParallell extends App {

  implicit val system = ActorSystem("QuickStart")

  def longRunningComputation(x: Int): Int = {
    println(s"Computing 1 ${x}")
    Thread.sleep(10000)
    println(s"Computation 1 ${x} done")
    x
  }
  def longRunningComputation2(x: Int): Int = {
    println(s"Computing 2 ${x}")
    Thread.sleep(10000)
    println(s"Computation 2 ${x} done")
    x
  }

  val processor: Flow[Int, Int, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      // prepare graph elements
      val balance = b.add(Balance[Int](2))
      val merge = b.add(Merge[Int](2))
      val f = Flow[Int].map(longRunningComputation)
      val f2 = Flow[Int].map(longRunningComputation2)


      // connect the graph
      balance.out(0) ~> f.async ~> merge.in(0)
      balance.out(1) ~> f2.async ~> merge.in(1)

      // expose ports
      FlowShape(balance.in, merge.out)
    })


  // Wire it all up.
  val xs = List(1,2,3)
  val source: Source[Int, NotUsed] = Source(xs)
  source.via(processor).runForeach(println)


  Thread.sleep(5000)
}

示例输出

Computing 2 1
Computation 2 1 done
Computing 2 2
1
Computation 2 2 done
Computing 2 3
2
Computation 2 3 done
3

我希望看到两个计算同时发生。例如:

Computing 1 1
Computing 1 2
Computation 1 2 done
Computing 1 3
Computation 1 1 done
Computing 2 4
1
2
..

【问题讨论】:

  • 但是你的代码甚至没有调用longRunningComputation2
  • 对,我一定是忘记放回去了。我相应地更新了输出和代码。 (不过也不起作用。)

标签: scala akka akka-stream


【解决方案1】:

尝试删除longRunningComputationlongRunningComputation2 中的Thread.sleep 并将xs 设置为更长的值,例如1 to 100,然后您将能够观察到并行处理。不知道为什么,但是阻塞 Thread.sleep 在 akka 中绝对被认为是反模式

【讨论】:

  • 我尝试从更大的范围(1 到 100k)开始,这似乎立即触发了并行性。谢谢回复。没想到。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-11-16
  • 1970-01-01
  • 2020-12-06
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多