【问题标题】:How to discard other Futures if the critical Future is finished in Scala?如果关键的未来在 Scala 中完成,如何丢弃其他期货?
【发布时间】:2020-08-18 20:08:02
【问题描述】:

假设我有三个远程调用来构建我的页面。其中一个 (X) 对页面至关重要,另外两个 (A, B) 仅用于增强体验。

因为criticalFutureX 太重要了,不受futureAfutureB 的影响,所以我希望所有远程调用的总延迟不超过X。

这意味着,如果criticalFutureX 完成,我想丢弃futureAfutureB

val criticalFutureX = ...
val futureA = ...
val futureB = ...

// the overall latency of this for-comprehension depends on the longest among X, A and B
for {
  x <- criticalFutureX
  a <- futureA
  b <- futureB
} ...

在上面的例子中,尽管它们是并行执行的,但总体延迟取决于 X、A 和 B 中最长的,这不是我想要的。

Latencies:
X: |----------|
A: |---------------|
B: |---|

O: |---------------| (overall latency)

firstCompletedOf,但它不能用来明确地说“在criticalFutureX完成的情况下”。

有没有类似下面的东西?

val criticalFutureX = ...
val futureA = ...
val futureB = ...

for {
  x <- criticalFutureX
  a <- futureA // discard when criticalFutureX finished
  b <- futureB // discard when criticalFutureX finished
} ...

X: |----------|
A: |-----------... discarded
B: |---|

O: |----------| (overall latency)

【问题讨论】:

标签: scala asynchronous future


【解决方案1】:

你可以通过承诺来实现这一目标


  def completeOnMain[A, B](main: Future[A], secondary: Future[B]) = {
    val promise = Promise[Option[B]]()
    main.onComplete {
      case Failure(_) =>
      case Success(_) => promise.trySuccess(None)
    }
    secondary.onComplete {
      case Failure(exception) => promise.tryFailure(exception)
      case Success(value)     => promise.trySuccess(Option(value))
    }
    promise.future
  }

一些测试代码

  private def runFor(first: Int, second: Int) = {

    def run(millis: Int) = Future {
      Thread.sleep(millis);
      millis
    }

    val start = System.currentTimeMillis()
    val combined = for {
      _ <- Future.unit
      f1 = run(first)
      f2 = completeOnMain(f1, run(second))
      r1 <- f1
      r2 <- f2
    } yield (r1, r2)

    val result = Await.result(combined, 10.seconds)
    println(s"It took: ${System.currentTimeMillis() - start}: $result")
  }

  runFor(3000, 4000)
  runFor(3000, 1000)

生产

It took: 3131: (3000,None)
It took: 3001: (3000,Some(1000))

【讨论】:

    【解决方案2】:

    使用 Scala 标准库 Futures 很难高效、可靠和安全地完成此类任务。没有办法中断尚未完成的Future,这意味着即使您选择忽略其结果,它仍然会继续运行并浪费内存和 CPU 时间。而且即使有方法可以中断正在运行的Future,也无法确保分配的资源(网络连接、打开的文件等)会被正确释放。

    我想指出,Ivan Stanislavciuc 给出的实现有一个 bug:如果 main Future 失败,那么 promise 将永远不会完成,这不太可能是你想要的。

    因此,我强烈建议研究现代并发效果系统,例如 ZIO 或猫效果。这些不仅更安全、更快捷,而且更容易。这是一个没有此错误的 ZIO 实现:

    import zio.{Exit, Task}
    import Function.tupled
    
    def completeOnMain[A, B](
      main: Task[A], secondary: Task[B]): Task[(A, Exit[Throwable, B])] =
      (main.forkManaged zip secondary.forkManaged).use {
        tupled(_.join zip _.interrupt)
      }
    

    Exit 是一种描述secondary 任务如何结束的类型,即。 e.通过成功返回 B 或由于错误(Throwable 类型)或由于中断。

    请注意,这个函数可以被赋予一个更复杂的签名,告诉你更多关于正在发生的事情,但我想在这里保持简单。

    【讨论】:

      猜你喜欢
      • 2023-03-03
      • 2021-09-21
      • 1970-01-01
      • 1970-01-01
      • 2015-05-17
      • 2016-08-09
      • 1970-01-01
      • 1970-01-01
      • 2020-05-10
      相关资源
      最近更新 更多