【问题标题】:Akka Stream Graph recover issueAkka Stream Graph 恢复问题
【发布时间】:2017-01-25 16:56:04
【问题描述】:

我创建了一个图表来并行化具有相同输入的两个流。流产生 Future[Option[Entity]]。如果 flowA 失败,我想返回 Future[None] 但似乎没有调用恢复

    val graph: Flow[Input, (Future[Option[Entity]], Future[Option[Entity]]), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val broadcast = builder.add(Broadcast[Input](2))
    val zip = builder.add(Zip[Future[Option[Entity]], Future[Option[Entity]]])

    val flowAwithRecovery = flowA.recover{ case t: Throwable =>
      logger.error(t, "Error retrieving output from flowA. Resuming without them.")
      Future.successful(None)
    }

    broadcast.out(0) ~> flowAwithRecovery ~> zip.in0
    broadcast.out(1) ~> flowB ~> zip.in1

    FlowShape(broadcast.in, zip.out)
  })

当我运行图表并且 flowA 返回一个 Failed Future 时,recover 不会被执行。作为一种解决方法,我在处理结束时恢复 Future,但我想在设计图表时加入这种逻辑。

【问题讨论】:

    标签: scala akka akka-stream


    【解决方案1】:

    当异常从上游传播时,recover 组合器开始发挥作用。 Future.failed 不是一个例外,而是一个有效的元素。 你需要类似的东西

    flowA.map(_.recover{ case t: Throwable =>
          logger.error(t, "Error retrieving output from flowA. Resuming without them.")
          None
        })
    

    换一种说法,您真的需要在流程中传递Futures 吗?在构建 flowAflowB 并让它们只生成 Option[Entity] 时,您可能最好使用 mapAsync

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-03-08
      • 2020-06-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多