【问题标题】:Await for a Sequence of Futures with timeout without failing on TimeoutException等待具有超时的期货序列,而不会在 TimeoutException 上失败
【发布时间】:2022-01-02 18:54:38
【问题描述】:

我有一系列相同类型的 scala Futures。

我想在有限的时间后得到整个序列的结果,而有些期货可能成功,有些可能失败,有些尚未完成,未完成的期货应视为失败。

我不想按顺序使用 Await 每个未来。

我确实看过这个问题:Scala waiting for sequence of futures 并尝试从那里使用解决方案,即:

  private def lift[T](futures: Seq[Future[T]])(implicit ex: ExecutionContext) =
    futures.map(_.map { Success(_) }.recover { case t => Failure(t) })

  def waitAll[T](futures: Seq[Future[T]])(implicit ex: ExecutionContext) =
    Future.sequence(lift(futures))

  futures: Seq[Future[MyObject]] = ...
  val segments = Await.result(waitAll(futures), waitTimeoutMillis millis)

但我仍然收到 TimeoutException,我猜是因为某些期货尚未完成。 该答案还指出,

现在 Future.sequence(lifted) 将在每个未来完成时完成,并使用 Try 表示成功和失败。

但我希望我的未来在超时之后完成,而不是在序列中的每个未来都完成时。我还能做什么?

【问题讨论】:

  • 我认为这对于纯 scala 期货很难做到,因为在未来的 api 中没有“本机”超时处理。等待的超时不会这样做。我认为您应该研究允许此类处理的更完整的“异步”库。如果您有兴趣,我可以在 monix 中为您提供一个示例
  • @IvanStanislavciuc 是的,请提供一个示例,其中包含您认为最好的任何依赖项,我正在寻找高效的代码,我的期货清单将非常大

标签: scala future


【解决方案1】:

如果我使用原始的Future(而不是一些内置此功能的 IO monad,或者没有一些 Akka 实用程序),我会一起破解实用程序,例如:

// make each separate future timeout
object FutureTimeout {
  // separate EC for waiting
  private val timeoutEC: ExecutorContext = ...

  private def timeout[T](delay: Long): Future[T] = Future {
    blocking {
      Thread.sleep(delay)
    }
    throw new Exception("Timeout")
  }(timeoutEC)

  def apply[T](fut: Future[T], delat: Long)(
    implicit ec: ExecutionContext
  ): Future[T] = Future.firstCompletedOf(Seq(
    fut,
    timeout(delay)
  ))
}

然后

Future.sequence(
  futures
    .map(FutureTimeout(_, delay))
    .map(Success(_))
    .recover { case e => Failure(e) }
)

由于每个未来最多会在delay 之后终止,因此我们可以在那之后将它们收集到一个结果中。

您必须记住,无论您如何触发超时,您都无法保证超时的Future 停止执行。它可以在某个地方的某个线程上运行,只是你不会等待结果。 firstCompletedOf 只是让这场比赛更加明确。

其他一些实用程序(例如 Cats Effect IO)允许您取消计算(例如在这样的比赛中使用),但您仍然必须记住 JVM 不能任意“杀死”正在运行的线程,因此取消会在一个计算阶段完成之后和下一个计算开始之前发生(例如在.maps 或.flatMaps 之间)。

如果您不害怕添加外部部门,还有其他(更可靠,因为 Thread.sleep 只是暂时的丑陋黑客)方法来超时 Future,例如 Akka utils。另见其他问题like this

【讨论】:

  • 是的,这就是我所说的native 超时。您需要阻止整个线程来触发“超时”异常。我认为这不应该在生产系统中运行。这可能会耗尽您的执行上下文,并且业务逻辑将停止工作,因为它等待太多超时。
  • 我同意这就是我添加免责声明的原因(如果不允许我使用 Akka 或 Cats IO 或其他什么)。可能大部分时间都可以,除非你会产生太多等待线程。但是 Akka 及其 Scheduler 应该能够帮助处理这种情况,类似于 IO monads。但我希望这些问题主要来自那些坚持原始Futures 并且由于公司/项目的政策而无法迁移到其他任何东西的人。
  • @MateuszKubuszok 感谢您的解决方案,但就像您说的那样,为我的每个未来创建另一个线程似乎效率很低。我可以集成任何其他依赖项,所以如果你能提供一个最有效和最简单的外部库解决方案,那将是最受欢迎的!
  • 如果您想要较少侵入性的更改,请查看akka.pattern.after。或任何支持IO.racePairtimer.sleep(..) 之类的IO monad。
【解决方案2】:

这是使用monix的解决方案

import monix.eval.Task
import monix.execution.Scheduler

val timeoutScheduler = Scheduler.singleThread("timeout") //it's safe to use single thread here because timeout tasks are very fast

def sequenceDiscardTimeouts[T](tasks: Task[T]*): Task[Seq[T]] = {
  Task
    .parSequence(
      tasks
        .map(t =>
          t.map(Success.apply) // Map to success so we can collect the value
            .timeout(500.millis)
            .executeOn(timeoutScheduler) //This is needed to run timesouts in dedicated scheduler that won't be blocked by "blocking"/io work if you have any
            .onErrorRecoverWith { ex =>
              println("timed-out")
              Task.pure(Failure(ex)) //It's assumed that any error is a timeout. It's possible to "catch" just timeout exception here
            }
        )
    )
    .map { res =>
      res.collect { case Success(r) => r }
    }
}

测试代码

implicit val mainScheduler = Scheduler.fixedPool(name = "main", poolSize = 10)


def slowTask(msg: String) = {
  Task.sleep(Random.nextLong(1000).millis) //Sleep here to emulate a slow task
    .map { _ =>
      msg
    }
}


val app = sequenceDiscardTimeouts(
  slowTask("1"),
  slowTask("2"),
  slowTask("3"),
  slowTask("4"),
  slowTask("5"),
  slowTask("6")
)

val started: Long = System.currentTimeMillis()
app.runSyncUnsafe().foreach(println)
println(s"Done in ${System.currentTimeMillis() - started} millis")

这将为每次运行打印不同的输出,但应该如下所示

timed-out
timed-out
timed-out
3
4
5
Done in 564 millis

请注意使用两个单独的调度程序。这是为了确保即使main 调度程序忙于业务逻辑也会触发超时。您可以通过减少主调度程序的poolSize 来测试它。

【讨论】:

    猜你喜欢
    • 2015-06-03
    • 1970-01-01
    • 2020-10-31
    • 2013-06-30
    • 1970-01-01
    • 2021-12-31
    • 2022-12-15
    • 1970-01-01
    • 2021-01-10
    相关资源
    最近更新 更多