【发布时间】:2020-05-31 22:27:29
【问题描述】:
我正在尝试执行按顺序返回 Future 的函数
所以,我有一个收藏
val in = Seq(1, 1, -1, -2, 3, -4, 5, 6, 7, -1, -2, -9, 1, 2, 2)
以及处理此集合中每个 int 的函数
def intToFuture(int: Int): Future[Int] = {
Future {
println(s"Running function $int")
Thread.sleep(1500)
int * 100
}
}
我需要实现按并行处理部分处理收集的逻辑。 获取前 n 个元素,将每个元素并行乘以 100,然后获取接下来的 n 个元素并执行相同操作......等等。
我所做的(在我阅读了这个网站上的一些帖子之后)是,我实现了两个功能
1)处理一批计算
def processBatch(ints: Seq[Int])(f: Int => Future[Int]): Future[Seq[Int]] = {
Future.sequence(ints.map(f))
}
2)和第二个,适用于迭代处理
def batchTraverse(in: Seq[Int], size: Int)(f: Int => Future[Int]): Future[Seq[Int]] = {
val grs = in.grouped(size).toList
def loop(l: Seq[Seq[Int]]): Future[Seq[Int]] = {
l match {
case Nil =>
Future.successful(l.flatten)//? flatten
case head :: tail =>
println("head="+head)
processBatch(head)(f).flatMap{
s => loop(tail).map{ t =>
s.appendedAll(t)
}
}
}
}
loop(grs)
}
然后开始
val fs: Future[Seq[Int]] = batchTraverse(in, 3)(intToFuture)
fs.onComplete{
f => println(f)
}
结果只进行了一次迭代,我哪里弄错了?
【问题讨论】:
标签: scala futuretask