【问题标题】:Akka Streams (Scala): Filtering out exceptionsAkka Streams (Scala):过滤异常
【发布时间】:2021-08-07 10:38:15
【问题描述】:
我的 akka 流管道中的一个步骤是在接收到无效输入时引发异常的转换。我想丢弃这些有问题的输入。所以,我想出了以下解决方案:
...
.map( input => Try( transformation( input ) ).toOption )
.filter( _.nonEmpty )
.map( _.get )
...
这需要 3 个步骤,实际上只是一个平面地图。
有没有更直接的 akka 方式来做到这一点?
【问题讨论】:
标签:
scala
akka-stream
flatmap
【解决方案1】:
您可以使用监督策略。取自文档:
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
val flow = Flow[Int]
.filter(100 / _ < 50)
.map(elem => 100 / (5 - elem))
.withAttributes(ActorAttributes.supervisionStrategy(decider))
您可以配置 Decider 以执行您需要的任何操作。如果您需要为所有异常跳过该元素,请使用
case _: Throwable => Supervision.Resume
看看https://doc.akka.io/docs/akka/current/stream/stream-error.html
【解决方案2】:
如果您想按照示例代码中指示的方式静默丢弃异常,以下是一些减少步骤的方法:
// A dummy transformation
def transformation(i: Int): Int = 100 / i
// #1: Use `collect`
Source(List(5, 2, 0, 1)).
map(input => Try(transformation(input)).toOption).
collect{ case x if x.nonEmpty => x.get }.
runForeach(println)
// Result: 20, 50, 100
// #2: Use `mapConcat`
Source(List(5, 2, 0, 1)).
mapConcat(input => List(Try(transformation(input)).toOption).flatten).
runForeach(println)
// Result: 20, 50, 100
请注意,Akka Source/Flow 没有 flatMap,尽管 mapConcat(和 flatMapConcat)确实以某种类似的方式起作用。