【问题标题】:Multiple futures that may fail - returning both successes and failures?可能失败的多个期货 - 返回成功和失败?
【发布时间】:2018-09-22 09:17:07
【问题描述】:

我有一种情况,我需要并行运行一堆操作。

所有操作都有相同的返回值(比如Seq[String])。

可能某些操作可能失败,而其他操作成功返回结果。

我想返回成功的结果以及发生的任何异常,以便记录它们以进行调试。

在我去编写自己的类之前,是否有内置方法或通过任何库(cats/scalaz)的简单方法来执行此操作?

我正在考虑在自己的未来执行每个操作,然后检查每个未来,并返回一个 Seq[String] -> Seq[Throwable] 的元组,其中左值是成功的结果(展平/组合),右是发生的任何异常的列表.

有没有更好的办法?

【问题讨论】:

  • 致OP:您接受的解决方案不正确,它不会等待计算结果。如果此代码运行时您的未来未完成,您将永远看到结果。您要么必须等到它完成,要么像我建议的那样注册一个异步回调。
  • 在检查结果之前已经在使用 Future.await。谢谢。
  • 我在文档中看不到这个方法 (scala-lang.org/api/2.12.3/scala/concurrent/Future.html)。那是隐式转换吗?我实际上从未使用过 Scala 期货,而且文档中似乎没有与 Java 的 Future.get 相当的东西
  • 意思是等待。准备好了,对不起

标签: scala concurrency parallel-processing scalaz scala-cats


【解决方案1】:

使用您在评论中提到的Await.ready,通常会失去使用期货的大部分好处。相反,您可以使用普通的 Future 组合符来执行此操作。让我们做一个更通用的版本,它适用于任何返回类型;可以轻松添加扁平化Seq[String]s。

def successesAndFailures[T](futures: Seq[Future[T]]): Future[(Seq[T], Seq[Throwable])] = {
  // first, promote all futures to Either without failures
  val eitherFutures: Seq[Future[Either[Throwable, T]]] = 
    futures.map(_.transform(x => Success(x.toEither)))
  // then sequence to flip Future and Seq
  val futureEithers: Future[Seq[Either[Throwable, T]]] = 
    Future.sequence(eitherFutures)
  // finally, Seq of Eithers can be separated into Seqs of Lefts and Rights
  futureEithers.map { seqOfEithers =>
    val (lefts, rights) = seqOfEithers.partition(_.isLeft)
    val failures = lefts.map(_.left.get)
    val successes = rights.map(_.right.get)
    (successes, failures)
  }
}

Scalaz and Cats have separate to simplify the last step.

类型可以由编译器推断出来,它们只是为了帮助您了解逻辑。

【讨论】:

    【解决方案2】:

    在您的Future 上调用value 会返回一个Option[Try[T]]。如果Future 尚未完成,则OptionNone。如果它已经完成,那么很容易打开和处理。

    if (myFutr.isCompleted)
      myFutr.value.map(_.fold( err: Throwable  => //log the error
                             , ss: Seq[String] => //process results
                             ))
    else
     // do something else, come back later
    

    【讨论】:

    • 如果您 else 没有阻止未来或注册回调,您似乎经常会错过这段代码的结果
    • 使用回调是另一种方法。难点在于回调返回Unit,很难将结果数据带入主线程做进一步处理。
    • @jvwh 那么你至少应该循环直到完成。显然它不应该是主动等待,否则它的性能会很糟糕。您的解决方案没有描述当未来没有立即返回时如何取回结果。
    • 我提供了一小段代码摘录。我假设(也许天真地)OP 可以评估其在他的上下文中的适用性。
    【解决方案3】:

    听起来像是 Try 成语的一个很好的用例(它基本上类似于 Either monad)。

    来自doc的使用示例:

    import scala.util.{Success, Failure}
    
    val f: Future[List[String]] = Future {
      session.getRecentPosts
    }
    
    f onComplete {
      case Success(posts) => for (post <- posts) println(post)
      case Failure(t) => println("An error has occurred: " + t.getMessage)
    }
    

    它实际上比您所要求的要多一点,因为它是完全异步的。它适合您的用例吗?

    【讨论】:

      【解决方案4】:

      我会这样做:

      import scala.concurrent.{Future, ExecutionContext}
      import scala.util.Success
      
      def eitherify[A](f: Future[A])(implicit ec: ExecutionContext): Future[Either[Throwable, A]] = f.transform(tryResult => Success(tryResult.toEither))
      
      def eitherifyF[A, B](f: A => Future[B])(implicit ec: ExecutionContext): A => Future[Either[Throwable, B]] = { a => eitherify(f(a)) }
      
      // here we need some "cats" magic for `traverse` and `separate`
      // instead of `traverse` you can use standard `Future.sequence`
      // there is no analogue for `separate` in the standard library
      
      import cats.implicits._
      
      def myProgram[A, B](values: List[A], asyncF: A => Future[B])(implicit ec: ExecutionContext): Future[(List[Throwable], List[B])] = {
        val appliedTransformations: Future[List[Either[Throwable, B]]] = values.traverse(eitherifyF(asyncF))
        appliedTransformations.map(_.separate)
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2011-10-16
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多