【问题标题】:Pool of workers with Akka Streams使用 Akka Streams 的工人池
【发布时间】:2016-03-25 13:13:18
【问题描述】:

akka streams documentation 中所述,我尝试创建一个工人池(流):

def balancer[In, Out](worker: Flow[In, Out, NotUsed], workerCount: Int): Flow[In, Out, NotUsed] = {
    import GraphDSL.Implicits._

    Flow.fromGraph(GraphDSL.create() { implicit b =>
      val balancer = b.add(Balance[In](workerCount))
      val merge = b.add(Merge[Out](workerCount))

      for (_ <- 1 to workerCount) {
        balancer ~> worker ~> merge
      }
      FlowShape(balancer.in, merge.out)
    })
  }

然后我使用这个函数并行运行一个流:

def main(args: Array[String]) {
    val system = ActorSystem()
    implicit val mat = ActorMaterializer.create(system)

    val flow = Flow[Int].map(e => {
      println(e)
      Thread.sleep(1000) // 1 second
      e
    })

    Source(Range.apply(1, 10).toList)
      .via(balancer(flow, 3))
      .runForeach(e => {})
  }

我得到了预期的输出1, 2, 3, 4, 5, 6, 7, 8, 9,但数字以每秒 1 次的速度出现(无并行性)。我做错了什么?

【问题讨论】:

  • 执行上下文呢?如果您使用一个固定大小的线程池,这是正常的
  • 也就是说默认上下文的大小是1?您能否详细说明配置执行上下文的首选方式是什么?
  • No default context 不是固定的,你可能是在导入全局隐式上下文,这会依赖太多的东西比如版本,你可以试试implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
  • 是熔断的缘故。如果禁用自动熔断,它可以工作。我还没有发布答案,因为我还找不到正确配置异步边界的方法。我还在调查中
  • 我已经创建了一个问题:github.com/akka/akka/issues/20146

标签: scala akka akka-stream


【解决方案1】:

该部分中的文档已过时,将在下一个版本中修复。基本上,您只需要在流程本身上调用.async。通过这样做,您可以在流周围绘制一个“盒子”(您可以将其想象为一个具有一个输入和输出端口的盒子),这将防止在该盒子上融合。通过这样做,基本上所有的工人都将成为专门的演员。图的其余部分(广播和合并阶段)将共享另一个 Actor(它们不会在单独的 Actor 上运行,异步框仅保护流,外部的东西仍将被融合)。

【讨论】:

  • 我相信应该是这样,但我也相信不是这样。 for (i &lt;- 1 to workerCount) { balancer ~&gt; worker.async ~&gt; merge } 似乎不起作用。
  • 正如我在您的票证中指出的那样(玩了之后,我自己也搞糊涂了)事实证明,异步阶段的默认缓冲区大小是 16,而 balance 最终将所有消息发送到一个阶段因为它报告仍然有缓冲空间。如果您发送更多消息(如 100 条)或将工作阶段的缓冲区大小设置为 1,您将看到所需的结果。
  • 是的,对,设置waitForAllDownstreams = true 也可以帮上一点忙。我认为(我还没有真正检查过),正如你所说的第一个下游报告,所有消息都被发送到它。使用waitForAllDownstreams 接缝分布更好
  • 不仅如此,还有融合的工作原理:首先,第一个 Request(16) 消息到达融合岛的合并+平衡。然后平衡阶段被拉动(它拉动数字的来源,但让我们忽略它)并将一个元素推到输出端口。由于还有需求,我们还没有处理来自另一个来源的 Request(16)(我们仍然在前一个的接收块中!)它只是再次拉动平衡,再次将新元素运送到端口,并且很快。 Fusing 对在暂停之前采取多少此类同步步骤有上限(自我消息)。
  • 在下一个版本(2.4.3)中,可以通过配置设置融合岛的事件数。如果将其设置为 1,则将再次发生预期的行为。虽然这会影响性能,因此不建议这样做,但这完成了观察结果发生的原因。
【解决方案2】:

正如 Endre Varga 所指出的,流程本身应标记为.async

但即便如此,行为也不是确定性的,因为异步阶段的默认缓冲区大小为 16,并且平衡器可能会将所有消息发送到同一个工作线程。

因此,balancer ~&gt; worker.async.addAttributes(Attributes.inputBuffer(1, 1)) ~&gt; merge 将导致所需的行为。

有关项目成员给出的答案,请参阅: https://github.com/akka/akka/issues/20146#issuecomment-201381356

【讨论】:

    猜你喜欢
    • 2018-03-29
    • 2018-02-06
    • 2019-04-13
    • 1970-01-01
    • 2016-09-07
    • 1970-01-01
    • 2018-02-22
    • 1970-01-01
    • 2022-01-10
    相关资源
    最近更新 更多