【发布时间】:2016-07-16 06:22:39
【问题描述】:
我正在尝试使用监督策略进行恢复,当我使用 map 编写 Flow 阶段时它可以工作,但如果我使用的是图形阶段,它永远不会被捕获,并且整个管道失败
object test extends App{
val stageSupervisionDecider: Supervision.Decider = {
case cEx: IllegalArgumentException =>
println("Supervision Catch")
Supervision.Resume
case _ => Supervision.Stop
}
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system)
.withSupervisionStrategy(stageSupervisionDecider)
)
Source(Vector(1,2,3,4,5,6,7))
.via(new FailFlow)
.runWith(Sink.foreach(println))
}
class FailFlow extends GraphStage[FlowShape[Int, Int]] {
val in = Inlet[Int]("FailFlow.In")
val out = Outlet[Int]("FailFlow.Out")
override def shape: FlowShape[Int, Int] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val m = grab(in)
if(m % 2 == 0)
throw new IllegalArgumentException("illegal value")
else
push(out,m)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
}
有什么想法吗?
【问题讨论】:
标签: scala akka akka-stream typesafe