【问题标题】:How to get a Subscriber and Publisher from a broadcasted Akka stream?如何从广播的 Akka 流中获取订阅者和发布者?
【发布时间】:2015-07-02 21:51:42
【问题描述】:

在使用更复杂的图表时,我无法将发布者和订阅者从我的流程中移除。我的目标是提供发布者和订阅者的 API,并在内部运行 Akka 流。这是我的第一次尝试,效果很好。

val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)

val flow = subscriberSource.to(someFunctionSink)

//create Reactive Streams Subscriber
val subscriber: Subscriber[Boolean] = flow.run()

//prints true
Source.single(true).to(Sink(subscriber)).run()

但是对于更复杂的广播图,我不确定如何获取订阅者和发布者对象?我需要部分图吗?

val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val publisherSink = Sink.publisher[Boolean]

FlowGraph.closed() { implicit builder =>
  import FlowGraph.Implicits._

  val broadcast = builder.add(Broadcast[Boolean](2))

  subscriberSource ~> broadcast.in
  broadcast.out(0) ~> someFunctionSink
  broadcast.out(1) ~> publisherSink
}.run()

val subscriber: Subscriber[Boolean] = ???
val publisher: Publisher[Boolean] = ???

【问题讨论】:

    标签: scala akka-stream reactive-streams


    【解决方案1】:

    当您调用RunnableGraph.run() 时,流将运行,结果是该运行的“物化值”。

    在您的简单示例中,Source.subscriber[Boolean] 的具体化值为Subscriber[Boolean]。在您的复杂示例中,您希望将图形的多个组件的具体化值组合成一个元组 (Subscriber[Boolean], Publisher[Boolean]) 的具体化值。

    您可以通过将您对其物化值感兴趣的组件传递给FlowGraph.closed(),然后指定一个函数来组合物化值:

    import akka.stream.scaladsl._
    import org.reactivestreams._
    
    val subscriberSource = Source.subscriber[Boolean]
    val someFunctionSink = Sink.foreach(Console.println)
    val publisherSink = Sink.publisher[Boolean]
    
    val graph =
      FlowGraph.closed(subscriberSource, publisherSink)(Keep.both) { implicit builder ⇒
        (in, out) ⇒
          import FlowGraph.Implicits._
    
          val broadcast = builder.add(Broadcast[Boolean](2))
    
          in ~> broadcast.in
          broadcast.out(0) ~> someFunctionSink
          broadcast.out(1) ~> out
      }
    val (subscriber: Subscriber[Boolean], publisher: Publisher[Boolean]) = graph.run()
    

    请参阅Scaladocs for more information about the overloads of FlowGraph.closed

    Keep.both是函数(a, b) => (a, b)的缩写)

    【讨论】:

    • 感谢您提供的信息丰富的回答。让我失望的是 GraphApply 的大小,它有点像怪物。也没有意识到签名def closed[Mat, M1, M2](g1: Graph[Shape, M1], g2: Graph[Shape, M2])(combineMat: (M1, M2) ⇒ Mat)(buildBlock: FlowGraph.Builder[Mat] ⇒ (g1.Shape, g2.Shape) ⇒ Unit): RunnableGraph[Mat] 的具体化值可能是我需要的元组。
    • 如果有人试图复制第二个示例代码,请注意:您需要使用 fanoutPublisher 而不是 publisher 才能使其工作。我的猜测是,构建图表会以某种方式使用分配的一个订阅者插槽。
    • @ripla 你有使用fanoutPublisher 的工作示例吗?你的是我在互联网上找到的关于fanoutPublisher 的两个参考文献之一,我正在尝试实现它,但没有一个包含实际代码示例。我也在 Akka 用户组中发布了question,但到目前为止还没有运气。
    猜你喜欢
    • 2018-08-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-12-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多