【问题标题】:scala futures - how to get result or failure from both futuresscala期货 - 如何从两个期货中获得结果或失败
【发布时间】:2019-09-01 17:50:21
【问题描述】:

我正在使用for 并行运行 2 个期货。我想知道在所有情况下哪个成功,哪个失败(都应该运行直到完成,结果或失败状态)。目前我只能检索组合成功结果

我从这里进行了检查,但这还不够,因为当一个失败时我没有获得成功状态,也没有在两者都失败的情况下都失败failure in Scala future's for comprehension

case class TaggedException(context:String, val throwable: Throwable) extends Exception(throwable.getMessage)

val f1 = Future {...}.recoverWith {case e:Throwable => Future.Failed(new TaggedException("first one failed", e))}
val f2 = Future {...}.recoverWith {case e: Throwable => Future.Failed(new TaggedException("second one failed", e))}

val combinedResult = for {
  r1 <- f1
  r2 <- f2
} yield (r1,r2)

combinedResult.onFailure {
case e : TaggedException => ... // if both fail I only get the first line in the for
// in case where single fails I only know fail status without the success of the second
}

我正在努力避免这种混乱:

var countCompleted = 0 ... or some other atomic way to count 
f1 onComplete {
  case Success(value) => {
    ... countCompleted increment ... 
    // handle success
    if both completed {
       // handle returning a status
    }
  }
  case Failure(error) => {
    ... countCompleted increment ... 
    // handle failure
    if both completed {
       // handle returning a status
    }
  }
}

f2 onComplete {
  case Success(value) => {
    ... countCompleted increment ... 
    // handle success
    if both completed {
       // handle returning a status
    }
  }
  case Failure(error) => {
    ... countCompleted increment ... 
    // handle failure
    if both completed {
       // handle returning a status
    }
  }
}

编辑:另一个版本 - 这是一种有效的方法吗?

def toFutureTry[A](future: Future[A]):Future[Try[A]] = future.map(Success(_)).recover {case t: Throwable => Failure(t)}

    val fa: Future[Try[Blah]] = toFutureTry(f1)
    val fb: Future[Try[Foo]] = toFutureTry(f2)

    val combinedRes = for {
      ra <- fa
      rb <- fb
    } yield (ra,rb)

    combinedRes.onComplete {
      case Success(successRes: (Try[Blah], Try[Foo])) => // all of these cases are success or fails
      case Failure(f: Throwable) => // i think it is unused right?
    }

【问题讨论】:

    标签: scala future


    【解决方案1】:

    您可以像这样组合transformzip

    val combinedResult: Future[(Try[T], Try[T])] =
      f1.transform(Success(_)).zip(f2.transform(Success(_)))
    

    那么你可以这样做:

    combinedResult map {
      case (Success(v1), Success(v2)) =>
      case (Success(v1), Failure(f2)) =>
      case (Failure(f1), Success(v2)) =>
      case (Failure(f1), Failure(f2)) =>
    }
    

    【讨论】:

      【解决方案2】:

      Future[A] 上使用flatMap 将无济于事,因为它总是会在其中一个产生的第一个故障时短路,而您确实想累积错误。

      使用Future.traverse 的解决方案将适用于任意多个Future[A] 实例:

      val f1 = Future.failed[Int](new Exception("42")).recoverWith {
        case e: Throwable => Future.failed(TaggedException("first one failed", e))
      }
      
      val f2 = Future(42).recoverWith {
        case e: Throwable =>
          Future.failed(TaggedException("second one failed", e))
      }
      
      val res: Future[List[Either[Throwable, Int]]] = 
        Future
         .traverse(List(f1, f2)) {
            eventualInt => eventualInt
             .map(i => Right(i))
             .recover { case err => Left(err) }
         }
      
      res.onComplete {
        case Failure(exception) =>
          println(exception)
        case Success(value) =>
          value.foreach {
            case Right(int) => println(s"Received num: $int")
            case Left(err) => println(s"Oh no, err: $err")
          }
      }
      
      Await.result(res, Duration.Inf)
      

      我们还可以使用 Validated 类型的猫的一点帮助:

      import cats.data.Validated.{Invalid, Valid}
      import scala.concurrent.ExecutionContext.Implicits.global
      import scala.concurrent.duration.Duration
      import scala.concurrent.{Await, Future}
      import cats.implicits._
      import scala.util.{Failure, Success}
      
      def main(args: Array[String]): Unit = {
        case class TaggedException(context: String, throwable: Throwable)
          extends Exception(throwable.getMessage)
      
        val f1 = Future.failed[Int](new Exception("42")).recoverWith {
          case e: Throwable => Future.failed(TaggedException("first one failed", e))
        }
      
        val f2 = Future(42).recoverWith {
          case e: Throwable => Future.failed(TaggedException("second one failed", e))
        }
      
        val res: Future[List[Validated[Throwable, Int]]] = 
          List(f1, f2)
           .traverse(eventualInt => eventualInt
                             .map(i => Valid(i))
                             .recover { case err => Invalid(err) })
      
        res.onComplete {
          case Failure(exception) =>
            println(exception)
          case Success(value) =>
            value.foreach {
              case Valid(int) => println(s"Received num: $int")
              case Invalid(err) => println(s"Oh no, err: $err")
            }
        }
      
        Await.result(res, Duration.Inf)
      }
      

      将产生:

      Oh no, err: TaggedException$3: 42
      Received num: 42
      

      【讨论】:

      • 并且没有 catz :)_
      • @AvnerBarr James 方法会奏效,因为他用Try[A] 包裹内部thunk,然后flatMap 不会短路。
      • 但由于函数调用,我得到了未来 - 我需要将尝试“插入”到其中
      • 如果你想使用 List[Future[A]]Future[List[A]],我可以提供一个具体的实现......但它会非常具体。
      • 这是一种有效的方法吗? : ``` def returnsAFuture() = Future {1/0} val f3 = returnsAFuture() def toFutureTry[T](future: Future[T]) = future.map(Success()).recover{case t : Throwable => Failure(t)} val sdf: Future[Try[Int] with Product with Serializable] = toFutureTry(f3) val successFail = f3.map{ Success() }.recover { case t:可抛出 => 失败(t)} ```
      【解决方案3】:

      使用 for-comprehensions,只要一行失败,代码就会停在那里并引发任何异常。如果r1 &lt;- f1 抛出 Throwable,r2 &lt;- f2 将永远不会被击中。

      我会亲自将它们都放入Either[Throwable, whatever],而不是将每个.recoverWith 放入Future.Failed(...)。这样,您不仅可以使用onFailure,还可以使用您获得的任何Left 值执行某些操作,并使用您获得的任何Right 值执行其他操作。或者你可以使用Try/Success/Failure... 取决于你想对错误做什么。

      我不知道您的具体用例,但如果您想单独处理每个成功或失败案例,您可以执行以下操作:

      import scala.concurrent.ExecutionContext.Implicits.global
      import scala.concurrent.Future
      import scala.util.{Failure, Success, Try}
      
      val f1 = Future(Try(1 / 1))
      val f2 = Future(Try(1 / 0))
      
      // for-comprehensions won't fall over when reading a Failure
      // as failures don't extend Throwable
      val combinedResult = for {
        r1 <- f1 // Success(1)
        r2 <- f2 // Failure(java.lang.ArithmeticException: / by zero)
      } yield List(r1,r2)
      
      combinedResult.map { // get inside Future
        f =>
          f.map { // get inside List
            case Success(a) => // do something with success
            case Failure(e: IndexOutOfBoundsException) => // do something with failure
            case Failure(e: ArithmeticException) => // do something with failure
            case Failure(e) => // do something with failure
          }
      }
      

      我个人不喜欢使用 onComplete;我更喜欢通过映射将数据尽可能长时间地保存在 Future 中。不过这只是个人喜好。

      【讨论】:

      • 你能举个例子吗?
      • 我用一个通用示例更新了答案或者如果您不想在以后处理异常,那么某些东西可能会很有帮助。
      • 不,这不起作用。理解的结果是一个失败的未来,所以,你没有什么可以映射的。
      • @Dima 不,不是,这个理解的结果是Future[List[Try[Int]]]
      • 哦,我错过了Try ...对不起
      【解决方案4】:

      ~~首先,你没有并行运行你的futures(为了理解会顺序运行它们)。~~

      更新上述不正确,如 cmets 中所述。我错过了在理解之外创建的未来。

      其次,正如您所注意到的,如果其中一个期货失败,另一个期货就会丢失。

      为了解决这样一个事实,即如果一个期货失败,另一个期货的结果就会丢失,您可以将您的未来值“提升”到Try

      val lifted1: Future[Try[Foo]] = f1.transform(Success(_))
      val lifted2: Future[Try[Bar]] = f1.transform(Success(_))
      

      现在,您可以执行以下操作:

       (lifted1 zip lifted2).map { 
         case (Success(foo), Success(bar)) => // both succeeded!
         case (Success(foo), Failure(t)) => // foo succeeded, bar failed with t
         case (Failure(t), Success(bar)) => // guess what!
         case (Failure(t1), Failure(t2)) => // oops
       }
      

      如果您发现自己经常这样做,您可能会发现它很有用,使用 lift 操作“拉皮条”您的未来:

       object FutureSyntax { 
         implicit class FutureOps[A](val fu: Future[A]) extends AnyVal {
            def liftToTry: Future[Try[A]] = fu.transform(Success(_))
         }
       }
      

      所以,现在如果你import FutureSyntax._,上面可以写成

       (f1.liftToTry zip f2.liftToTry).map { 
          case (Success(foo), Success(bar)) => ... 
          ...
       }
      

      你也可以用for-comprehension来写,只是会更冗长。在这种情况下,我不会选择理解:它们在处理一系列期货时很好,其中后面的顺序取决于前面的结果。对于处理独立的期货,zipsequence 通常是更好的选择。

      【讨论】:

      • 你没有并行运行你的期货(为了理解将顺序运行它们)。他在将它们用于理解之前急切地创建期货。那里没有顺序。
      • 是的,它们应该并行运行,因为它们是在循环之外创建的
      • 啊,是的。我错过了。哎呀。
      • 你可以这样做:fu.transform(Success(_)) 而不是fu.map(Success.apply).recover { case NonFatal(t) =&gt; Failure(t)
      • @ViktorKlang 我认为,您需要第二个参数,但 fu.transform(Success(_), identity) 也可以,是的。
      猜你喜欢
      • 2019-05-21
      • 1970-01-01
      • 1970-01-01
      • 2020-11-05
      • 1970-01-01
      • 1970-01-01
      • 2021-09-21
      • 1970-01-01
      • 2019-03-23
      相关资源
      最近更新 更多