【问题标题】:Why is Akka Streams swallowing my exceptions?为什么 Akka Streams 会吞下我的异常?
【发布时间】:2016-06-08 11:45:53
【问题描述】:

为什么会出现异常

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source

object TestExceptionHandling {
  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem()
    implicit val materializer = ActorMaterializer()(defaultActorSystem)

    Source(List(1, 2, 3)).map { i =>
      if (i == 2) {
        throw new RuntimeException("Please, don't swallow me!")
      } else {
        i
      }
    }.runForeach { i =>
      println(s"Received $i")
    }
  }
}

默默地忽略?我可以看到在打印Received 1 后流停止了,但没有记录任何内容。请注意,问题一般不在于日志记录配置,因为如果我在 application.conf 文件中设置 akka.log-config-on-start = on 会看到很多输出。

【问题讨论】:

  • 您正在丢弃异常,因为您忽略了runForeach 的返回值。
  • @ViktorKlang 感谢您指出这一点,我刚刚更新了我的答案!

标签: scala exception-handling akka-stream


【解决方案1】:

我现在使用自定义 Supervision.Decider 确保正确记录异常,可以这样设置:

val decider: Supervision.Decider = { e =>
  logger.error("Unhandled exception in stream", e)
  Supervision.Stop
}

implicit val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)

另外,正如Vikor Klang 所指出的,在上面给出的示例中,异常也可以通过“捕获”

Source(List(1, 2, 3)).map { i =>
  if (i == 2) {
    throw new RuntimeException("Please, don't swallow me!")
  } else {
    i
  }
}.runForeach { i =>
  println(s"Received $i")
}.onComplete {
  case Success(_) =>
    println("Done")
  case Failure(e) =>
    println(s"Failed with $e")
}

但是请注意,这种方法对您没有帮助

Source(List(1, 2, 3)).map { i =>
  if (i == 2) {
    throw new RuntimeException("Please, don't swallow me!")
  } else {
    i
  }
}.to(Sink.foreach { i =>
  println(s"Received $i")
}).run()

因为run() 返回Unit

【讨论】:

  • run() 仅返回 Unit 因为它默认保留“左侧”(Keep.left)的实体值。如果您使用过: toMat(Sink.foreach(...))(Keep.right) 那么它会再次起作用。
【解决方案2】:

当我开始使用 akk-streams 时,我也有类似的问题。 Supervision.Decider 有帮助,但并非总是如此。

不幸的是,它没有捕获ActionPublisher 中抛出的异常。我看到它已处理,ActorPublisher.onError 被调用但它没有达到Supervision.Decider。它适用于文档中提供的简单 Stream。

如果我使用Sink.actorRef,错误也不会到达actor。

为了实验,我尝试了以下示例

val stream = Source(0 to 5).map(100 / _)
stream.runWith(Sink.actorSubscriber(props))

在这种情况下,Decider 捕获了异常,但从未到达 Actor 订阅者。

总的来说,我认为这是不一致的行为。我不能使用一种机制来处理 Stream 中的错误。

我原来的 SO 问题:Custom Supervision.Decider doesn't catch exception produced by ActorPublisher

这是跟踪它的 akka 问题:https://github.com/akka/akka/issues/18359

【讨论】:

    【解决方案3】:

    我在 Akka Streams 吞下我的异常时遇到了不同的问题。我会把它贴在这里,因为这是谷歌的最高结果。

    在这种情况下,sourceSource[ByteString, Any]

    source.runWith(StreamConverters.fromOutputStream(() => outputStream))
    

    这将返回一个 Future[IOResult]。如果写入输出流失败(例如,源失败),那么 Future 仍将返回 Success。在这种情况下,您实际上必须检查 IOResult 是否有错误:

    source.runWith(StreamConverters.fromOutputStream(() => output)).
          map(ior => {
            if (!ior.wasSuccessful) 
              throw new RuntimeException(ior.getError)
          })
    

    这样的结果将是一个失败的未来,但有正确的例外。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2010-11-02
      • 2011-01-05
      • 1970-01-01
      • 1970-01-01
      • 2018-08-07
      • 2018-11-15
      • 2021-08-07
      相关资源
      最近更新 更多