您可能想看看这个 Akka 文档:使用 MergeHub 和 BroadcastHub 构建 dynamic pub-sub service。
下面是分别使用MergeHub 和BroadcastHub 作为动态扇入和扇出结的示例代码。
想法是将MergeHub 与BroadcastHub 连接起来,通过Flow.fromSinkAndSource 以Flow 的形式形成发布-订阅通道:
val (bfSink, bfSource) = MergeHub.source[String](perProducerBufferSize).
toMat(BroadcastHub.sink[String](bufferSize))(Keep.both).
run
val busFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(bfSink, bfSource)
请注意,上述 sn-p 中的 Keep.both 会从 MergeHub.source[T] 和 BroadcastHub.sink[T] 生成一个物化值 (Sink[T, NotUsed], Source[T, NotUsed]) 的元组,它们具有以下方法签名:
object MergeHub {
def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = // ...
// ...
}
object BroadcastHub {
def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = // ...
// ...
}
下面是一个简单的 pub-sub 频道 busFlow 的示例代码(类似于 Akka 文档中的示例):
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.NotUsed
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val (bfSink, bfSource) = MergeHub.source[String](perProducerBufferSize = 32).
toMat(BroadcastHub.sink[String](bufferSize = 256))(Keep.both).
run
// Optional: avoid building up backpressure when there is no subscribers
bfSource.runWith(Sink.ignore)
val busFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(bfSink, bfSource)
测试busFlow:
Source(101 to 103).map(i => s"Batch(A)-$i").
delay(2.seconds, DelayOverflowStrategy.backpressure).
viaMat(busFlow)(Keep.right).
to(Sink.foreach{ case s: String => println("Consumer(1)-" + s) }).
run
Source(104 to 105).map(i => s"Batch(B)-$i").
viaMat(busFlow)(Keep.right).
to(Sink.foreach{ case s: String => println("Consumer(2)-" + s) }).
run
// Consumer(2)-Batch(B)-104
// Consumer(2)-Batch(B)-105
// Consumer(1)-Batch(B)-104
// Consumer(1)-Batch(B)-105
// Consumer(1)-Batch(A)-101
// Consumer(1)-Batch(A)-102
// Consumer(2)-Batch(A)-101
// Consumer(1)-Batch(A)-103
// Consumer(2)-Batch(A)-102
// Consumer(2)-Batch(A)-103
作为发布-订阅通道,busFlow 的输入通过bfSink 发布给所有订阅者,而其输出通过bfSource 流向所有发布的元素。例如:
val p1 = Source.tick[Int](0.seconds, 5.seconds, 5).map(_.toString)
p1.runWith(bfSink)
val p2 = Source.tick[Int](2.seconds, 10.seconds, 10).map(_.toString)
p2.runWith(bfSink)
val s1 = bfSource
s1.runForeach(x => println(s"s1 --> $x"))
val s2 = bfSource
s2.runForeach(x => println(s"s2 --> $x"))
// s1 --> 5
// s2 --> 5
// s1 --> 10
// s2 --> 10
// s2 --> 5
// s1 --> 5
// s2 --> 5
// s1 --> 5
// s1 --> 10
// s2 --> 10
// s2 --> 5
// s1 --> 5
// ...
其他可能感兴趣的相关主题包括用于流完成控制的KillSwitch 和用于将流元素从给定生产者路由到一组动态消费者的PartitionHub。