【问题标题】:how to do sequential execution of Futures in scala如何在scala中顺序执行期货
【发布时间】:2013-12-06 01:53:07
【问题描述】:

在这种情况下,我需要使用迭代器,为每个项目调用一个函数 f(item) 并返回一个 Future[Unit]

但是,我需要让每个f(item) 调用按顺序执行,它们不能并行运行。

for(item <- it)
  f(item)

不会起作用,因为这会并行启动所有调用。

我该怎么做才能让它们按顺序跟随?

【问题讨论】:

  • 当您想按顺序运行代码时,使用期货有什么意义?
  • @drexin:同步使用现有的异步接口,或者当您有多个相关的长时间运行的计算时避免阻塞例如 Web 服务器上的线程。虽然这个遍历集合的特殊示例似乎有点不寻常。
  • 我的问题是针对这个具体案例的,应该更清楚。
  • 如果未来的结果形成下一个请求,它只能按顺序完成。异步和顺序是不同的东西,是可以一起使用的有效概念。从 ElasticSearch 中检索 100 万条记录是顺序期货的真实示例。

标签: scala


【解决方案1】:

如果您不介意非常本地化的var,您可以将异步处理(每个f(item))序列化如下(flatMap 进行序列化):

val fSerialized = {
  var fAccum = Future{()}
  for(item <- it) {
    println(s"Processing ${item}")
    fAccum = fAccum flatMap { _ => f(item) }
  }
  fAccum
}

fSerialized.onComplete{case resTry => println("All Done.")}

一般来说,避免 Await 操作 - 它们会阻塞(有点破坏异步点,消耗资源并且对于草率的设计,可能会死锁)


炫酷技巧 1:

您可以通过通常的嫌疑人flatmapFutures 链接在一起 - 它序列化异步操作。有什么不能做的吗? ;-)

def f1 = Future { // some background running logic here...}
def f2 = Future { // other background running logic here...}

val fSerialized: Future[Unit] = f1 flatMap(res1 => f2)  

fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)}

以上块都没有 - 主线程在几十纳秒内直接运行。在所有情况下都使用 Future 来执行并行线程并跟踪异步状态/结果以及链接逻辑。

fSerialized 表示两个不同的异步操作链接在一起的组合。一旦评估了 val,它就会立即启动 f1(异步运行)。 f1 像任何 Future 一样运行 - 当它最终完成时,它称它为 onComplete 回调块。这是很酷的一点——flatMap 将它的参数安装为f1 onComplete 回调块——所以f2f1 完成后立即启动,没有阻塞、轮询或浪费资源使用。当 f2 完成时,fSerialized 完成 - 所以它运行 fSerialized.onComplete 回调块 - 打印“Both Done”。

不仅如此,您还可以使用简洁的非意大利面代码尽可能多地链接平面地图

 f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ...

如果您要通过 Future.onComplete 执行此操作,则必须将连续操作嵌入为嵌套的 onComplete 层:

f1.onComplete{case res1Try => 
  f2
  f2.onComplete{case res2Try =>
    f3
    f3.onComplete{case res3Try =>
      f4
      f4.onComplete{ ...
      }
    }
  }
}

没有那么好。

测试证明:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8))

fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}

炫酷技巧 2:

这样的理解:

for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr

不过是语法糖:

aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } }

这是一个平面地图链,然后是最终地图。

也就是说

f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!")

等同于

for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!"

Test to Prove(继之前的测试):

val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!"
fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}

不那么酷的技巧 3:

不幸的是,您不能在同一个理解中混合使用迭代器和期货。编译错误:

val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last

嵌套 fors 带来了挑战。以下不序列化,而是并行运行异步块(嵌套推导不会将后续 Futures 与 flatMap/Map 链接,而是作为 Iterable.flatMap{item => f(item)} 链接 - 不一样!)

val fSerial = {for {nextItem <- itemIterable} yield
                 for {nextRes <- f(nextItem)} yield "Did It"}.last

同样使用 foldLeft/foldRight 加上 flatMap 并不能像您预期的那样工作 - 似乎是一个错误/限制;所有异步块都是并行处理的(所以Iterator.foldLeft/Right 不能与Future.flatMap 关联):

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
val empty = Future[Unit]{()}
def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2)

//val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))}
val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))}

fSerialized.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}

但这有效(涉及 var):

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)

var fSerial = Future{()}
for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem)) 

【讨论】:

  • 使用 foldLeft/foldRight 加上 flatMap 确实按预期工作。例如,items.foldLeft(Future()){(x,y) =&gt; x.flatMap{_=&gt;f(y)}} 完全符合您的预期(为items 中的每个y 串行执行f(y)。它在您的情况下不起作用的原因是您使用了一个辅助方法serialize,其中右手未来 (f2) 按值传递,从而强制其早期评估。按名称传递 f2 应该修复它:def serialize(f1: Future[Unit], f2: =&gt; Future[Unit])。当你在它的时候,你不妨通过 f1名字也是。
  • 请注意,对于flatMap,如果f1 失败,f2 不会执行。使用onComplete 时,即使f1 失败,f2 也会执行。无论f1 是否失败,您也可以使用transformf2 运行。例如。 f1 transform(res1 =&gt; util.Success(f2)).
【解决方案2】:
def seqFutures[T, U](items: TraversableOnce[T])(yourfunction: T => Future[U]): Future[List[U]] = {
  items.foldLeft(Future.successful[List[U]](Nil)) {
    (f, item) => f.flatMap {
      x => yourfunction(item).map(_ :: x)
    }
  } map (_.reverse)
}

如果您因为资源限制而无法一次运行多个Future 而按顺序运行,则创建和使用只有一个线程的自定义ExecutionContext 可能会更容易。

【讨论】:

  • 不得不说这主要说明了 scala 的标准并发库是多么糟糕。
  • 为什么标准 scala API 中没有这个?
【解决方案3】:

另一种选择是使用 Akka Streams:

val doneFuture = Source
  .fromIterator(() => it)
  .mapAsync(parallelism = 1)(f)
  .runForeach{identity}

【讨论】:

  • 这是一个很好的建议,让我开始研究使用它的解决方案。谢谢
  • 你也可以选择 monix 流或 fs2
【解决方案4】:

只是扩展 @wingedsubmariner 的答案,因为最后的 .reverse 困扰着我(并添加了导入语句以确保完整性)

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}

def seqFutures[T, U](xs: TraversableOnce[T])(f: T => Future[U])
                    (implicit ec: ExecutionContext): Future[List[U]] = {
  val resBase = Future.successful(mutable.ListBuffer.empty[U])
  xs
    .foldLeft(resBase) { (futureRes, x) =>
      futureRes.flatMap {
        res => f(x).map(res += _)
      }
    }
    .map(_.toList)
}

注意: ListBuffer 具有恒定时间 +=.toList 操作

【讨论】:

    【解决方案5】:

    此代码向您展示了如何使用简单的 promise 来按顺序运行期货。

    代码包含两个排序器,一个一个一个地执行工作,另一个允许您指定同时运行多少个。

    异常并不能保持简单。

    import scala.concurrent.{Await, Future, Promise}
    import scala.util.Try
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration.Duration
    
    /**
      * Simple class to encapsulate work, the important element here is the future
      * you can ignore the rest
      */
    case class Work(id:String, workTime:Long = 100) {
      def doWork(): Future[String] = Future {
        println(s"Starting $id")
        Thread.sleep(workTime)
        println(s"End $id")
        s"$id ready"
      }
    }
    
    /**
      * SimpleSequencer is the one by one execution, the promise is the element
      * who allow to the sequencer to work, pay attention to it.
      *
      * Exceptions are ignore, this is not production code
      */
    object SimpleSequencer {
      private def sequence(works:Seq[Work], results:Seq[String], p:Promise[Seq[String]]) : Unit = {
        works match {
          case Nil => p.tryComplete(Try(results))
          case work::tail => work.doWork() map {
            result => sequence(tail, results :+ result, p)
          }
        }
      }
    
      def sequence(works:Seq[Work]) : Future[Seq[String]] = {
        val p = Promise[Seq[String]]()
        sequence(works, Seq.empty, p)
        p.future
      }
    }
    
    /**
      * MultiSequencer fire N works at the same time
      */
    object MultiSequencer {
      private def sequence(parallel:Int, works:Seq[Work], results:Seq[String], p:Promise[Seq[String]]) : Unit = {
        works match {
          case Nil => p.tryComplete(Try(results))
          case work =>
            val parallelWorks: Seq[Future[String]] = works.take(parallel).map(_.doWork())
            Future.sequence(parallelWorks) map {
              result => sequence(parallel, works.drop(parallel), results ++ result, p)
            }
        }
      }
    
      def sequence(parallel:Int, works:Seq[Work]) : Future[Seq[String]] = {
        val p = Promise[Seq[String]]()
        sequence(parallel, works, Seq.empty, p)
        p.future
      }
    
    }
    
    
    object Sequencer {
    
      def main(args: Array[String]): Unit = {
        val works = Seq.range(1, 10).map(id => Work(s"w$id"))
        val p = Promise[Unit]()
    
        val f = MultiSequencer.sequence(4, works) map {
          resultFromMulti =>
            println(s"MultiSequencer Results: $resultFromMulti")
            SimpleSequencer.sequence(works) map {
              resultsFromSimple =>
                println(s"MultiSequencer Results: $resultsFromSimple")
                p.complete(Try[Unit]())
            }
        }
    
        Await.ready(p.future, Duration.Inf)
      }
    }
    

    【讨论】:

      【解决方案6】:

      也许更优雅的解决方案是使用递归,如下详述。

      这可以用作返回 Future 的长操作的示例:

      def longOperation(strToReturn: String): Future[String] = Future {
        Thread.sleep(5000)
        strToReturn
      }
      

      下面是遍历待处理项的递归函数,依次处理:

      def processItems(strToReturn: Seq[String]): Unit = strToReturn match {
        case x :: xs => longOperation(x).onComplete {
          case Success(str) =>
            println("Got: " + str)
            processItems(xs)
          case Failure(_) =>
            println("Something went wrong")
            processItems(xs)
        }
        case Nil => println("Done")
      }
      

      这是通过让函数在 Future 完成或失败后递归调用自身以及要处理的剩余项目来完成的。

      要启动此活动,您可以调用“processItems”函数并处理一些要处理的项目,如下所示:

      processItems(Seq("item1", "item2", "item3"))
      

      【讨论】:

        【解决方案7】:

        您可以使用 Await.result :(代码未经测试)

        “等待:用于阻塞未来的单例对象(将其结果传输到当前线程)。”

        val result  = item map {it => Await.result(f(it), Duration.Inf) } 
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2019-11-04
          • 1970-01-01
          • 2014-03-21
          • 2018-06-13
          • 2020-07-30
          • 2016-07-11
          • 1970-01-01
          • 2018-08-04
          相关资源
          最近更新 更多