【问题标题】:akka-stream graph with cycle with drop-everything feedback branch doesn't complete when materialized具有循环的 akka-stream 图和 drop-everything 反馈分支在具体化时未完成
【发布时间】:2016-09-21 01:51:37
【问题描述】:

我创建了一个小示例图,其中输入元素被传递到输出,并被发送回反馈循环,该循环丢弃所有内容(使用过滤器)。

我希望获得与身份 Flow[T] 相同的行为,因为反馈分支会丢弃所有内容。

相反,输入元素按预期发出,但物化从未完成。

我做错了吗?这应该发生吗?输入流完成后,广播的反馈输出不应该完成吗?

我猜这个问题类似于here 描述的先有鸡还是先有蛋的场景?

我正在使用 akka-stream-experimental 2.0.3

干杯

object Test extends App {
  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()

  val flow = Flow.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val dropEverything = b.add(Flow[Int].filter(_ => false))
    val input = b.add(Merge[Int](2))
    val bcast = b.add(Broadcast[Int](2))

    input                         ~> bcast
    input.in(1) <~ dropEverything <~ bcast.out(1)

    FlowShape(input.in(0), bcast.out(0))
  })

  val result = Source.single(42).via(flow).runWith(Sink.foreach(println))

  try {
    // prints 42 but the stream doesn't terminate and the await timeouts
    println(Await.result(result, 5.seconds))
  } finally {
    system.terminate()
  }
}

【问题讨论】:

    标签: scala akka akka-stream


    【解决方案1】:

    已回答here。循环永远不会完成,因为 MergeBroadcast 正在等待对方完成。

    您可以将其更改为 val input = b.add(Merge[Int](2, eagerComplete = true)) 以防止这种情况发生。

    或者,您可以尝试val dropEverything = b.add(Flow[Int].take(1).filter(_ =&gt; false)),其中n 是从输入到进程的元素数,在本例中为1

    【讨论】:

      猜你喜欢
      • 2018-10-27
      • 2021-05-21
      • 2016-05-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-12-02
      相关资源
      最近更新 更多