【发布时间】:2023-12-25 19:07:01
【问题描述】:
我在 akka 流中实现了一个自定义组件,它将元素作为输入,根据一个键对它们进行分组和合并,然后通过十几个出口中的一个将它们发送出去。您可以将此组件视为一种 GroupBy 组件,它不会将流划分为子流,而是实际流。除了对传入的元素进行分区之外,它还将它们合并为一个元素,即组件内部发生了一些缓冲,因此 1 个元素输入并不一定意味着 1 个元素通过出口输出。
以下是所述组件的简化实现。
class CustomGroupBy[A,B](k: Int, f: A => Int) extends GraphStage[FlowShape[B, B]] {
val in = Inlet[A]("CustomGroupBy.in")
val outs = (0 until k).map(i => Outlet[B](s"CustomGroupBy.$i.out"))
override val shape = new AmorphousShape(scala.collection.immutable.Seq(in), outs)
/* ... */
}
我现在将该组件的每个出口连接到不同的 Sink 并组合所有这些 sink 的具体化值。
我用图形 DSL 尝试了一些东西,但还没有完全成功。有人会好心地为我提供一个 sn-p 来做到这一点或为我指明正确的方向吗?
提前致谢!
【问题讨论】:
-
您能否给出一个简短的代码示例说明为什么 Graph DSL 不适合您?我想说,如果你在图中连接了所有端口,它应该可以工作。
标签: scala akka akka-stream reactive-streams