【问题标题】:Future call in for comprehension in Scala - how do I process them sequentially?未来需要在 Scala 中理解 - 我如何按顺序处理它们?
【发布时间】:2026-02-08 22:05:02
【问题描述】:

我有一个类似这样的代码:

for (n <- 1 to 1000) {
  someFuture map {
    // some other stuff
}

这是一段基本的代码,运行良好。但是,somefuture 对数据库进行了一些查询,并且数据库无法并行接收多个查询,这是之前发生的情况(它会产生许多线程来执行 somefuture,正如人们所期望的那样)。

理想情况下,我想按顺序进行(即当 n=1 时调用 someFuture,进行一些处理,当 n=2 时调用 someFuture,进行一些处理等)。我考虑过使用一些阻塞方法(来自Await),但这发生在演员内部,所以阻塞不是一个好主意。另一个想法是为这个特定的未来调用创建一个固定的线程池,但听起来有点矫枉过正。我应该怎么做?

更新:我找到了this answer,它建议按照我的想法创建一个固定的线程池。不过,这是正确的做法吗?

【问题讨论】:

    标签: scala future


    【解决方案1】:

    您想要映射或平面映射一个未来。

    scala> val f = Future(42)
    f: scala.concurrent.Future[Int] = Future(Success(42))
    
    scala> (1 to 10).foldLeft(f)((f,x) => f.map(i => i + 1))
    res1: scala.concurrent.Future[Int] = Future(<not completed>)
    
    scala> res1
    res2: scala.concurrent.Future[Int] = Future(Success(52))
    
    scala> (1 to 10).foldLeft(f)((f,i) => {
         | println(i)
         | f.map(x => x+i) })
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    res4: scala.concurrent.Future[Int] = Future(<not completed>)
    
    scala> res4
    res5: scala.concurrent.Future[Int] = Future(Success(97))
    

    【讨论】:

    • 此代码不是堆栈安全的。如果期货数量很大,您将获得*Error
    • @simpadjo 你尝试了什么?对我来说,(1 to Int.MaxValue) 会耗尽堆,而不是堆栈。
    • 这是一篇很好的文章,涵盖了期货堆栈安全问题alexn.org/blog/2017/01/30/…。我自己也在生产中遇到过。
    • 我现在无法运行您的代码,但我认为您在构建序列时甚至在使用期货之前就得到了 OOM。
    • @simpadjo 重要的是你正在尝试什么。此外,未来的时间表会立即(人们抱怨)和映射的未来在完成时运行。正交地,你可以将任意序列表示为带有 var 的循环,但重复映射的概念是相同的。
    【解决方案2】:

    一种方法是将消息发送给处理数据的参与者。 由于演员一一处理消息,您将按顺序而不是并行执行查询。

    for (n <- 1 to 1000) {
      someFuture map {
          x => actor ! x
        }
    }
    

    【讨论】:

    • 查询发生在someFuture,因此mapped Future 里面的内容并不重要,对吧?
    • 是的,如果查询是在 someFuture 中运行的,那么这意味着所有的查询执行都已经被触发,甚至在它到达 scala 的 for comprehension 块之前。因此,在您的情况下,您必须在 for理解块之前将查询发送给您的演员。
    【解决方案3】:

    处理此问题的理想长期方法可能是使用执行连接池的数据库访问层。大多数框架(如 play 或 slick)都有一些首选的处理方式,或者如果你想要独立的东西,DBCP 可能是一个不错的选择。我认为其中大多数应该有一种“自然”的方式来将连接数限制为固定大小并在池中没有可用连接时阻塞,这会限制您的并行度。

    除了引入类似的其他依赖项之外,使用您提到的线程池执行上下文绝对是要走的路。这不是矫枉过正。这很常见,而且比任何其他处理方式都不会那么麻烦。

    【讨论】:

    • 我做过多个 EC,所以我不想说它不可行,但是对于链式执行的简单用例,我看不出比 future.map 或类似,这意味着在未来完成时运行它。工作的分配方式可能会有很大差异,具体取决于用例。