【问题标题】:Akka Stream connect to multiple sinksAkka Stream 连接到多个接收器
【发布时间】:2023-12-25 19:07:01
【问题描述】:

我在 akka 流中实现了一个自定义组件,它将元素作为输入,根据一个键对它们进行分组和合并,然后通过十几个出口中的一个将它们发送出去。您可以将此组件视为一种 GroupBy 组件,它不会将流划分为子流,而是实际流。除了对传入的元素进行分区之外,它还将它们合并为一个元素,即组件内部发生了一些缓冲,因此 1 个元素输入并不一定意味着 1 个元素通过出口输出。

以下是所述组件的简化实现。

class CustomGroupBy[A,B](k: Int, f: A => Int) extends GraphStage[FlowShape[B, B]] {

  val in = Inlet[A]("CustomGroupBy.in")
  val outs = (0 until k).map(i => Outlet[B](s"CustomGroupBy.$i.out"))

  override val shape = new AmorphousShape(scala.collection.immutable.Seq(in), outs)

  /* ... */
}

我现在将该组件的每个出口连接到不同的 Sink 并组合所有这些 sink 的具体化值。

我用图形 DSL 尝试了一些东西,但还没有完全成功。有人会好心地为我提供一个 sn-p 来做到这一点或为我指明正确的方向吗?

提前致谢!

【问题讨论】:

  • 您能否给出一个简短的代码示例说明为什么 Graph DSL 不适合您?我想说,如果你在图中连接了所有端口,它应该可以工作。

标签: scala akka akka-stream reactive-streams


【解决方案1】:

您很可能需要内置的broadcast 阶段。用法示例可参考here:

val bcast = builder.add(Broadcast[Int](2))

in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
            bcast ~> f4 ~> merge

【讨论】:

  • 感谢您的回答!不幸的是,我不想要广播,因为我计划可能有许多传出流,并且为每个流进行广播和过滤似乎很昂贵。你的经验是它不贵吗?你能推荐点别的吗?
  • 不客气。不幸的是,我唯一的经验是broadcast。复制你的Source 可能更容易,然后你可以实现 N 个流...
【解决方案2】:

您可能想要akka.stream.scaladsl.Partition[T](outputPorts: Int, partitioner: T ⇒ Int) stage

编辑:

要连接所有端口并保留具体化的值,您必须将阶段作为参数提供给GraphDSL.create 方法。

这允许您为物化值定义组合器,并将阶段添加到您的GraphDSLBuilder,作为最后一个参数的参数。请注意,这个重载的 create 方法不采用可变参数,因此可能无法以这种方式处理 14 个不同的阶段。

假设您的阶段有一些名称,以下是我将如何实现它,在 3 个输出的情况下:

val runnable = RunnableGraph.fromGraph(
  GraphDSL.create(
    source, customGroupBy, sink1, sink2, sink3)(combiner) {  //the combiner is the function to combine the materialized values
      implicit b => //this is the builder, needed as implicit to make the connections 
      (src, cgb, s1, s2, s3) => //here are the stages added to the builder
      import GraphDSL.Implicits._

      src.out ~> cgb.in
      List(s1, s2, s3).map(_.in).zip(cgb.outlets).foreach{
        case (in, out) => in ~> out
      }

      ClosedShape
    }
  )
)

请记住,如果您不需要某个阶段的具体化值,您可以通过 val cgb = b.add(customGroupBy) 将其添加到 DSL 中

【讨论】:

  • 是的,这似乎是我想要的。不过,由于我正在做的合并,我必须自己编写,但作为一个黑盒,组件看起来就像一个分区阶段。我的问题确实是将输出连接到接收器,然后结合每个接收器的物化值。
  • 不错!谢谢!有没有办法在传递接收器之前以某种方式组合接收器以在其中创建或创建它们以摆脱 22 个最大参数的限制?
  • 让我澄清一下我之前的问题。我的每个接收器都实现了一个 ActorRef(对于一个 ActorSubscriber)。除了等待所有这些都被终止之外,我不需要具体化 ActorRefs,所以我知道工作已经完成。换句话说,如果我能以某种方式知道处理是通过其他方式完成的,我可能不需要接收器的具体化值。
  • 如果你的materializedValue 是ActorRef,它是同步创建的,所以你的流的物化确保了actors的创建。如果它是Future[ActorRef],它会变得更加复杂。一种丑陋的方法是将你的接收器列表分成更少的AmorphousShapes(使用GraphDSL.create),每个都有几个输入,但只有一个物化值在它里面的所有接收器都完成时完成,然后把那些(更少)形状进入您的主要GraphDSL.create。它真的很丑,但它确实有效。
  • 有趣。没有其他方法可以结合汇的物化价值吗?也许解决方案是在 GraphDSL.create 之外进行,例如创建一个 actor 对所有 sink 的具体化 actor 进行终止监视,然后使用该 actor 形成一个“主”接收器,当它监视的所有 actor 都终止时完成?