【问题标题】:Dynamically merge Akka streams动态合并 Akka 流
【发布时间】:2019-11-16 02:30:01
【问题描述】:

我正在尝试通过以下方式使用 Akka 流构建 pub sub bus:

发布者为该主题添加一个源流,订阅者指定一个主题并获取该主题的所有内容。但是主题可能由多个发布者发布,发布者和订阅者都可以随时加入。

我的想法是合并所有来源,然后将过滤后的来源返回给订阅者。

但是,由于发布者可以随时加入,因此可以在订阅后添加源,并且订阅者需要从中获取数据,就像该主题的任何其他已发布数据一样。

有没有办法动态管理流到源的合并,以便以下内容保持:

publish(topic: String, messages: Source[T])
subscribe(topic: String): Source[T]

这样,无论何时添加发布者,主题的订阅者都会在订阅完成后将所有消息发布到与该主题相关的任何来源。

也很高兴听到有关替代方法的消息。

谢谢, Z

【问题讨论】:

    标签: scala merge stream akka akka-stream


    【解决方案1】:

    您可能想看看这个 Akka 文档:使用 MergeHubBroadcastHub 构建 dynamic pub-sub service

    下面是分别使用MergeHubBroadcastHub 作为动态扇入和扇出结的示例代码。

    想法是将MergeHubBroadcastHub 连接起来,通过Flow.fromSinkAndSource 以Flow 的形式形成发布-订阅通道:

    val (bfSink, bfSource) = MergeHub.source[String](perProducerBufferSize).
      toMat(BroadcastHub.sink[String](bufferSize))(Keep.both).
      run
    
    val busFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(bfSink, bfSource)
    

    请注意,上述 sn-p 中的 Keep.both 会从 MergeHub.source[T]BroadcastHub.sink[T] 生成一个物化值 (Sink[T, NotUsed], Source[T, NotUsed]) 的元组,它们具有以下方法签名:

    object MergeHub {
      def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = // ...
      // ...
    }
    
    object BroadcastHub {
      def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = // ...
      // ...
    }
    

    下面是一个简单的 pub-sub 频道 busFlow 的示例代码(类似于 Akka 文档中的示例):

    import akka.actor.ActorSystem
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.NotUsed
    
    implicit val system = ActorSystem("system")
    implicit val materializer = ActorMaterializer()
    implicit val ec = system.dispatcher
    
    val (bfSink, bfSource) = MergeHub.source[String](perProducerBufferSize = 32).
      toMat(BroadcastHub.sink[String](bufferSize = 256))(Keep.both).
      run
    
    // Optional: avoid building up backpressure when there is no subscribers
    bfSource.runWith(Sink.ignore)
    
    val busFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(bfSink, bfSource)
    

    测试busFlow

    Source(101 to 103).map(i => s"Batch(A)-$i").
      delay(2.seconds, DelayOverflowStrategy.backpressure).
      viaMat(busFlow)(Keep.right).
      to(Sink.foreach{ case s: String => println("Consumer(1)-" + s) }).
      run
    
    Source(104 to 105).map(i => s"Batch(B)-$i").
      viaMat(busFlow)(Keep.right).
      to(Sink.foreach{ case s: String => println("Consumer(2)-" + s) }).
      run
    
    // Consumer(2)-Batch(B)-104
    // Consumer(2)-Batch(B)-105
    // Consumer(1)-Batch(B)-104
    // Consumer(1)-Batch(B)-105
    // Consumer(1)-Batch(A)-101
    // Consumer(1)-Batch(A)-102
    // Consumer(2)-Batch(A)-101
    // Consumer(1)-Batch(A)-103
    // Consumer(2)-Batch(A)-102
    // Consumer(2)-Batch(A)-103
    

    作为发布-订阅通道,busFlow 的输入通过bfSink 发布给所有订阅者,而其输出通过bfSource 流向所有发布的元素。例如:

    val p1 = Source.tick[Int](0.seconds, 5.seconds, 5).map(_.toString)
    p1.runWith(bfSink)
    
    val p2 = Source.tick[Int](2.seconds, 10.seconds, 10).map(_.toString)
    p2.runWith(bfSink)
    
    val s1 = bfSource
    s1.runForeach(x => println(s"s1 --> $x"))
    
    val s2 = bfSource
    s2.runForeach(x => println(s"s2 --> $x"))
    
    // s1 --> 5
    // s2 --> 5
    // s1 --> 10
    // s2 --> 10
    // s2 --> 5
    // s1 --> 5
    // s2 --> 5
    // s1 --> 5
    // s1 --> 10
    // s2 --> 10
    // s2 --> 5
    // s1 --> 5
    // ...
    

    其他可能感兴趣的相关主题包括用于流完成控制的KillSwitch 和用于将流元素从给定生产者路由到一组动态消费者的PartitionHub

    【讨论】:

    • 感谢 Leo,但这对我想要实现的目标没有用处。我将发布一个示例来说明我很快所做的事情。
    • @Zohar Etzioni,也许示例代码没有足够清楚地说明发布-订阅功能。请查看扩展答案。
    • 是否可以将表达式 p1.runWith(bfSink) 的结果映射到未来?
    • 很遗憾,不,bfSink 定义为 Sink[T, NotUsed],不返回具体化结果。
    • 我也是这么想的。这是一个问题,因为我的 pub 子总线是一个 grpc 服务,所以发布需要返回一个未来。有没有办法用广播集线器解决它?我的 source.queue 解决方案有什么问题吗?
    【解决方案2】:

    这就是我最终要做的。发布者和订阅者都可以出现和消失,无论订阅者何时加入以及发布者何时加入,订阅者都应该能够看到其订阅的所有已发布消息(按主题),无论订阅时哪些发布者处于活动状态制成。欢迎评论。

    def main(args: Array[String]): Unit = {
       val actorSystem = ActorSystem("test")
       val materializerSettings = ActorMaterializerSettings(actorSystem)
       implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)
       implicit val ec: ExecutionContext = actorSystem.dispatcher
    
       val (queue, pub) = Source.queue[Int](100, akka.stream.OverflowStrategy.dropHead).toMat(Sink.asPublisher(true))(Keep.both).run()
    
       val p1 = Source.tick[Int](0.seconds, 5.seconds, 5)
    
       p1.runForeach(x=> {queue.offer(x)})
    
       val p2= Source.tick[Int](2.seconds,10.seconds, 10)
       p2.runForeach(x=> queue.offer(x))
    
       val s1 = Source.fromPublisher(pub)
       s1.runForeach(x=> println(s"s1 =======> ${x}"))
    
       val s2 = Source.fromPublisher(pub)
       s2.runForeach(x=> println(s"s2 =======> ${x}"))
    }
    

    【讨论】:

      猜你喜欢
      • 2022-10-24
      • 1970-01-01
      • 2020-12-06
      • 2017-09-24
      • 2018-12-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多