【问题标题】:Akka Streams (Scala): Filtering out exceptionsAkka Streams (Scala):过滤异常
【发布时间】:2021-08-07 10:38:15
【问题描述】:

我的 akka 流管道中的一个步骤是在接收到无效输入时引发异常的转换。我想丢弃这些有问题的输入。所以,我想出了以下解决方案:

...
.map( input => Try( transformation( input ) ).toOption )
.filter( _.nonEmpty )
.map( _.get )
...

这需要 3 个步骤,实际上只是一个平面地图。

有没有更直接的 akka 方式来做到这一点?

【问题讨论】:

    标签: scala akka-stream flatmap


    【解决方案1】:

    您可以使用监督策略。取自文档:

    val decider: Supervision.Decider = {
      case _: ArithmeticException => Supervision.Resume
      case _                      => Supervision.Stop
    }
    
    val flow = Flow[Int]
      .filter(100 / _ < 50)
      .map(elem => 100 / (5 - elem))
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
    

    您可以配置 Decider 以执行您需要的任何操作。如果您需要为所有异常跳过该元素,请使用

     case _: Throwable => Supervision.Resume
    

    看看https://doc.akka.io/docs/akka/current/stream/stream-error.html

    【讨论】:

      【解决方案2】:

      如果您想按照示例代码中指示的方式静默丢弃异常,以下是一些减少步骤的方法:

      // A dummy transformation
      def transformation(i: Int): Int = 100 / i
      
      // #1: Use `collect`
      Source(List(5, 2, 0, 1)).
        map(input => Try(transformation(input)).toOption).
        collect{ case x if x.nonEmpty => x.get }.
        runForeach(println)
        // Result: 20, 50, 100
      
      // #2: Use `mapConcat`
      Source(List(5, 2, 0, 1)).
        mapConcat(input => List(Try(transformation(input)).toOption).flatten).
        runForeach(println)
        // Result: 20, 50, 100
      

      请注意,Akka Source/Flow 没有 flatMap,尽管 mapConcat(和 flatMapConcat)确实以某种类似的方式起作用。

      【讨论】:

        猜你喜欢
        • 2016-07-22
        • 2017-12-23
        • 2020-02-07
        • 1970-01-01
        • 1970-01-01
        • 2018-10-24
        • 2015-07-14
        • 2016-06-08
        • 2021-09-08
        相关资源
        最近更新 更多