【问题标题】:Most idiomatic way to mix synchronous, asynchronous, and parallel computation in a scala for comprehension of futures在 Scala 中混合同步、异步和并行计算以理解未来的最惯用方式
【发布时间】:2025-12-20 05:55:01
【问题描述】:

假设我有 4 个未来的计算要做。前两个可以并行完成,但第三个必须在前两个之后完成(即使前两个的值没有在第三个中使用——将每个计算视为执行某些 db 操作的命令)。最后,在前 3 次计算之后必须进行第 4 次计算。此外,还有一个副作用,可以在前 3 次完成后开始(将其视为启动周期性可运行文件)。在代码中,这可能如下所示:

for {
  _ <- async1     // not done in parallel with async2 :( is there
  _ <- async2     // any way of achieving this cleanly inside of for?
  _ <- async3
  _ =  sideEffect // do I need "=" here??
  _ <- async4
} yield ()

cmets 表明我对代码质量的怀疑:

  1. 在 for 理解中并行执行两个操作的最简洁方法是什么?
  2. 有没有一种方法可以在没有这么多“_”字符的情况下实现此结果(也没有分配命名引用,至少在 sideEffect 的情况下)
  3. 最干净、最惯用的方法是什么?

【问题讨论】:

    标签: scala


    【解决方案1】:

    您可以使用zip 组合两个期货,包括zip 本身的结果。您最终会得到包含元组的元组,但是如果您对 Tuple2 使用中缀表示法,则很容易将它们分开。下面我为简洁定义了一个同义词~(这是解析器组合库所做的,除了它的~ 是一个与Tuple2 类似的不同类)。

    作为替代_ = 的副作用,您可以将其移至yield,或使用大括号和分号将其与以下语句组合。我仍然认为_ = 更惯用,至少就for 中的副作用声明而言是惯用的。

    val ~ = Tuple2
    
    for {
      a ~ b ~ c <- async1 zip
                   async2 zip
                   async3
      d <- { sideEffect; async4 }
    } yield (a, b, c, d)
    

    【讨论】:

    • 整洁。学习了一种新的 ~ 语法。 zip 的问题在于故障处理。来自文档,If 'this' future fails, the resulting future is failed with the throwable stored in 'this'. Otherwise, if 'that' future fails, the resulting future is failed with the throwable stored in 'that'. 相反,顺序应该无关紧要,投掷物应该累积在 Seq[Throwable] 中的 Failure 中。
    • @kfer38 也许,但这不是Try(期货成功/失败的底层封装)的工作方式。这不仅仅是 zip 的问题,还有Future.sequenceflatMap 的许多用途等问题。您必须重写TryFuturePromise 以获得所需的行为。相反,我更喜欢它返回首先发生的失败的行为 - 这将允许在出现错误时更快地完成。我同意顺序无关紧要。
    • 设计一个允许程序员指定快速失败的单子行为或错误累积的 API 仍然存在一些有趣的悬而未决的问题——例如参见 my question herethis mailing list thread
    【解决方案2】:

    for-comprehensions 表示一元操作,一元操作是有序的。 monad 的超类 applicative,其计算不依赖于先前计算的结果,因此可以并行运行。

    Scalaz 有一个 |@| 运算符用于组合应用程序,因此您可以使用 (future1 |@| future2)(proc(_, _)) 并行调度两个期货,然后对它们的结果运行“proc”,而不是 for {a &lt;- future1; b &lt;- future2(a)} yield b 的顺序计算(或只是future1 flatMap future2)。

    stdlib Futures 上已经有一个名为 .zip 的方法,它可以并行组合 Futures,而 scalaz impl 确实使用了这个方法:https://github.com/scalaz/scalaz/blob/scalaz-seven/core/src/main/scala/scalaz/std/Future.scala#L36 并且 .zip 和 for-comprehensions 可以混合使用以具有并行和连续的部分,视情况而定。 因此,只需使用 stdlib 语法,您上面的示例就可以写成:

    for {
      _ <- async1 zip async2
      _ <- async3
      _ =  sideEffect
      _ <- async4
    } yield ()
    

    或者,写成没有理解:

    async1 zip async2 flatMap (_=> async3) flatMap {_=> sideEffect; async4}
    

    【讨论】:

    • 在 scalaz 中可以这样写: (async1 |@| async2).tupled >> async3 >> {sideEffect; async4}
    【解决方案3】:

    仅供参考,让两个期货并行运行并仍然通过理解来处理它们非常简单。使用zip 的建议解决方案当然可以,但是我发现当我想处理几个期货并在它们都完成后做某事时,并且我有两个或更多彼此独立的,我会做一些事情像这样:

    val f1 = async1
    val f2 = async2
    //First two futures now running in parallel
    
    for {
      r1 <- f1     
      r2 <- f2     
      _ <- async3
      _ =  sideEffect 
      _ <- async4
    } yield {
      ...
    }
    

    现在 for 理解的结构方式在检查 f2 的完成状态之前肯定会等待 f1,但是这两个期货背后的逻辑是同时运行的。这比一些建议要简单一些,但仍然可以满足您的需求。

    【讨论】:

      【解决方案4】:

      您的代码看起来已经结构化减去并行计算期货。

      1. 使用辅助函数,最好写一个代码生成器打印出来 所有元组案例的助手
      2. 据我所知,你需要为结果命名或赋值_
      3. 示例代码

      带有帮助器的示例代码。

      import scala.concurrent.Future
      import scala.concurrent.ExecutionContext.Implicits.global
      
      object Example {
        def run: Future[Unit] = {
          for {
            (a, b, c) <- par(
              Future.successful(1),
              Future.successful(2),
              Future.successful(3)
            )
            constant = 100
            (d, e) <- par(
              Future.successful(a + 10),
              Future.successful(b + c)
            )
          } yield {
            println(constant)
            println(d)
            println(e)
          }
        }
      
        def par[A,B](a: Future[A], b: Future[B]): Future[(A, B)] = {
          for {
            a <- a
            b <- b
          } yield (a, b)
        }
      
        def par[A,B,C](a: Future[A], b: Future[B], c: Future[C]): Future[(A, B, C)] = {
          for {
            a <- a
            b <- b
            c <- c
          } yield (a, b, c)
        }
      }
      Example.run
      

      编辑:

      为 1 到 20 个期货生成代码:https://gist.github.com/nanop/c448db7ac1dfd6545967#file-parhelpers-scala

      parPrinter 脚本:https://gist.github.com/nanop/c448db7ac1dfd6545967#file-parprinter-scala

      【讨论】:

      • a zip b 已经和你的par(a, b) 做了差不多的事情。其他人的口味也可能不同,但我个人发现阴影在例如a &lt;- a 非常不愉快。
      • @TravisBrown 好点! zip 似乎没有输出 3 个或更多元组。我以为你会带来无形的。如果类型相关,则可以改用Future.sequence。此外,发布的 par 助手不处理失败,但可以通过创建 Promise 并使用 tryComplete 来完成。
      • a &lt;- a 中的future 可以很容易地重命名,并变成a &lt;- fa,因为代码都已生成。命名许多变量变得很烦人,所以这里没有完成
      • zip 很适合两个期货!太糟糕了 zipped 不能像处理列表那样处理 Future 对象的元组。
      • 我想知道 scala 类型系统是否能够支持zipped 用于任意长度的异构元组。