如果您不介意非常本地化的var,您可以将异步处理(每个f(item))序列化如下(flatMap 进行序列化):
val fSerialized = {
var fAccum = Future{()}
for(item <- it) {
println(s"Processing ${item}")
fAccum = fAccum flatMap { _ => f(item) }
}
fAccum
}
fSerialized.onComplete{case resTry => println("All Done.")}
一般来说,避免 Await 操作 - 它们会阻塞(有点破坏异步点,消耗资源并且对于草率的设计,可能会死锁)
炫酷技巧 1:
您可以通过通常的嫌疑人flatmap 将Futures 链接在一起 - 它序列化异步操作。有什么不能做的吗? ;-)
def f1 = Future { // some background running logic here...}
def f2 = Future { // other background running logic here...}
val fSerialized: Future[Unit] = f1 flatMap(res1 => f2)
fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)}
以上块都没有 - 主线程在几十纳秒内直接运行。在所有情况下都使用 Future 来执行并行线程并跟踪异步状态/结果以及链接逻辑。
fSerialized 表示两个不同的异步操作链接在一起的组合。一旦评估了 val,它就会立即启动 f1(异步运行)。 f1 像任何 Future 一样运行 - 当它最终完成时,它称它为 onComplete 回调块。这是很酷的一点——flatMap 将它的参数安装为f1 onComplete 回调块——所以f2 在f1 完成后立即启动,没有阻塞、轮询或浪费资源使用。当 f2 完成时,fSerialized 完成 - 所以它运行 fSerialized.onComplete 回调块 - 打印“Both Done”。
不仅如此,您还可以使用简洁的非意大利面代码尽可能多地链接平面地图
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ...
如果您要通过 Future.onComplete 执行此操作,则必须将连续操作嵌入为嵌套的 onComplete 层:
f1.onComplete{case res1Try =>
f2
f2.onComplete{case res2Try =>
f3
f3.onComplete{case res3Try =>
f4
f4.onComplete{ ...
}
}
}
}
没有那么好。
测试证明:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
Console.flush
blocking{Thread.sleep((item seconds).toMillis)}
println("Done")
}
val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8))
fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
炫酷技巧 2:
这样的理解:
for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr
不过是语法糖:
aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } }
这是一个平面地图链,然后是最终地图。
也就是说
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!")
等同于
for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!"
Test to Prove(继之前的测试):
val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!"
fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
不那么酷的技巧 3:
不幸的是,您不能在同一个理解中混合使用迭代器和期货。编译错误:
val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last
嵌套 fors 带来了挑战。以下不序列化,而是并行运行异步块(嵌套推导不会将后续 Futures 与 flatMap/Map 链接,而是作为 Iterable.flatMap{item => f(item)} 链接 - 不一样!)
val fSerial = {for {nextItem <- itemIterable} yield
for {nextRes <- f(nextItem)} yield "Did It"}.last
同样使用 foldLeft/foldRight 加上 flatMap 并不能像您预期的那样工作 - 似乎是一个错误/限制;所有异步块都是并行处理的(所以Iterator.foldLeft/Right 不能与Future.flatMap 关联):
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
Console.flush
blocking{Thread.sleep((item seconds).toMillis)}
println("Done")
}
val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
val empty = Future[Unit]{()}
def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2)
//val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))}
val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))}
fSerialized.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
但这有效(涉及 var):
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
Console.flush
blocking{Thread.sleep((item seconds).toMillis)}
println("Done")
}
val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
var fSerial = Future{()}
for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem))