【问题标题】:In scala, how do you transform a list of futures into a future that returns the first successful future? [duplicate]在 scala 中,如何将期货列表转换为返回第一个成功未来的未来? [复制]
【发布时间】:2020-05-10 05:11:26
【问题描述】:

所有未来都可能最终成功(有些可能会失败),但我们想要第一个成功的。并希望将结果表示为未来。如果列表中的所有期货都失败了,这个未来将失败。

【问题讨论】:

标签: scala future


【解决方案1】:

如上所示,documentationFuture.firstCompletedOf 已提供。

import scala.concurrent.{ExecutionnContext, Future }

def foo[T](f: => Seq[Future[T]])(implicit ec: ExecutionContext): Future[T] =
  Future.firstCompletedOf(f)

【讨论】:

  • 另一个答案说第一次完成可能会失败。
  • 是的,这样行不通。应该有一个 firstSuccess。
【解决方案2】:

RayRoestenburg 像这样返回第一个成功的

def firstSucceededOf[T](futures: List[Future[T]]): Future[T] = {
    val p = Promise[T]()
    val size = futures.size
    val failureCount = new AtomicInteger(0)

    futures foreach {
      _.onComplete {
        case Success(v) => p.trySuccess(v)
        case Failure(e) =>
          val count = failureCount.incrementAndGet
          if (count == size) p.tryFailure(e)
      }
    }
    p.future
  }

关键是要了解Promise.trySuccess 只完成了Promise 一次。这是一个有效的example

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util._
import java.util.concurrent.atomic.AtomicInteger
import Thread.sleep

object FirstSucceededOfExample extends App {
  def firstSucceededOf[T](futures: List[Future[T]]): Future[T] = {
    val p = Promise[T]()
    val size = futures.size
    val failureCount = new AtomicInteger(0)

    futures foreach {
      _.onComplete {
        case Success(v) => p.trySuccess(v)
        case Failure(e) =>
          val count = failureCount.incrementAndGet
          if (count == size) p.tryFailure(e)
      }
    }
    p.future
  }

  val futures = List(
    Future {sleep(2000); -11}, 
    Future {sleep(3000); -7}, 
    Future {42}
  )

  firstSucceededOf(futures)
    .andThen(v => println(v))

  sleep(1000)
}

哪个输出

Success(42)

如果所有期货都以失败告终

  val futures = List(
    Future(throw new RuntimeException("boom 2")),
    Future(throw new RuntimeException("boom 3")),
    Future(throw new RuntimeException("boom 1"))
  )

它返回最后一次完成的失败。


注意Future.firstCompletedOfnot sufficient,因为它返回第一个完成(作为成功或失败)不是第一个成功完成:

object FirstSucceededOfExample extends App {
  def foo[T](f: => Seq[Future[T]]): Future[T] =
    Future.firstCompletedOf(f)

  val futures = List(
    Future {sleep(2000); -11},
    Future {sleep(3000); -7},
    Future.failed(new RuntimeException("boom"))
  )

  foo(futures)
    .andThen(v => println(v))

  Thread.sleep(1000)
}

哪个输出

Failure(java.lang.RuntimeException: boom)

【讨论】:

  • 您可以Future { sleep(n) ; -11 } 不带括号。
猜你喜欢
  • 2019-01-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-05-12
  • 2023-03-03
  • 1970-01-01
  • 2017-02-18
  • 2016-09-18
相关资源
最近更新 更多