【问题标题】:Using supervision strategy with GraphStage doesn't work对 GraphStage 使用监督策略不起作用
【发布时间】:2016-07-16 06:22:39
【问题描述】:

我正在尝试使用监督策略进行恢复,当我使用 map 编写 Flow 阶段时它可以工作,但如果我使用的是图形阶段,它永远不会被捕获,并且整个管道失败

    object  test extends App{

      val stageSupervisionDecider: Supervision.Decider = {
        case cEx: IllegalArgumentException =>
          println("Supervision Catch")
          Supervision.Resume
        case _ => Supervision.Stop
      }

      implicit val system = ActorSystem("system")

      implicit val materializer = ActorMaterializer(
        ActorMaterializerSettings(system)
          .withSupervisionStrategy(stageSupervisionDecider)
      )

      Source(Vector(1,2,3,4,5,6,7))
        .via(new FailFlow)
        .runWith(Sink.foreach(println))
    }


    class FailFlow extends GraphStage[FlowShape[Int, Int]] {

      val in = Inlet[Int]("FailFlow.In")
      val out = Outlet[Int]("FailFlow.Out")

      override def shape: FlowShape[Int, Int] = FlowShape.of(in, out)

      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
        new GraphStageLogic(shape) {
          setHandler(in, new InHandler {
            override def onPush(): Unit = {
              val m = grab(in)
              if(m % 2 == 0)
                throw new IllegalArgumentException("illegal value")
              else
              push(out,m)
            }
          })

          setHandler(out, new OutHandler {
            override def onPull(): Unit = {
                pull(in)
            }
          })
        }
      }
    }

有什么想法吗?

【问题讨论】:

    标签: scala akka akka-stream typesafe


    【解决方案1】:

    根据documentation(大红框):

    ZipWith、GraphStage junction、ActorPublisher 源和 ActorSubscriber 接收器组件尚不支持监督策略属性

    【讨论】:

      猜你喜欢
      • 2018-03-17
      • 1970-01-01
      • 2015-11-14
      • 2013-01-14
      • 2016-01-31
      • 2019-03-12
      • 1970-01-01
      • 2017-03-10
      • 2013-03-30
      相关资源
      最近更新 更多