【发布时间】:2019-08-24 06:38:38
【问题描述】:
这种批处理集合的子集以进行并行处理的模式可以吗?有没有我想念的更好的方法来做到这一点?
当给定需要从返回 scala Future 的服务中获取的实体 ID 集合时,我们不是一次发出所有请求,而是对它们进行批处理,因为该服务一次只能处理一定数量的请求。在某种程度上,它是一种原始的节流机制,可以避免数据存储不堪重负。它看起来像代码味道。
object FutureHelper{
def batchSerially[A, B, M[a] <: TraversableOnce[a]](l: M[A])(dbFetch: A => Future[B])(
implicit ctx: ExecutionContext, buildFrom: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
l.foldLeft(Future.successful(buildFrom(l))){
case (accF, curr) => for {
acc <- accF
b <- dbFetch(curr)
} yield acc += b
}.map(s => s.result())
}
object FutureBatching extends App {
implicit val e: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
val entityIds = List(1,2,3,4,5,6)
val batchSize = 2
val listOfFetchedResults =
FutureHelper.batchSerially(entityIds.grouped(batchSize)) {groupedByBatchSize =>
Future.sequence{
groupedByBatchSize.map( i => Future.successful(i))
}
}.map(_.flatten.toList)
}
【问题讨论】:
标签: multithreading scala future