【问题标题】:Parallel data fetches that are batched批处理的并行数据提取
【发布时间】:2019-08-24 06:38:38
【问题描述】:

这种批处理集合的子集以进行并行处理的模式可以吗?有没有我想念的更好的方法来做到这一点?

当给定需要从返回 scala Future 的服务中获取的实体 ID 集合时,我们不是一次发出所有请求,而是对它们进行批处理,因为该服务一次只能处理一定数量的请求。在某种程度上,它是一种原始的节流机制,可以避免数据存储不堪重负。它看起来像代码味道。


object FutureHelper{
  def batchSerially[A, B, M[a] <: TraversableOnce[a]](l: M[A])(dbFetch: A => Future[B])(
    implicit ctx: ExecutionContext, buildFrom: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
    l.foldLeft(Future.successful(buildFrom(l))){
      case (accF, curr) => for {
        acc <- accF
        b <- dbFetch(curr)
      } yield acc += b
    }.map(s => s.result())
}

object FutureBatching extends App {
  implicit val e: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global

  val entityIds = List(1,2,3,4,5,6)
  val batchSize = 2

  val listOfFetchedResults =
    FutureHelper.batchSerially(entityIds.grouped(batchSize)) {groupedByBatchSize =>
      Future.sequence{
        groupedByBatchSize.map( i => Future.successful(i))
      }
    }.map(_.flatten.toList)
}

【问题讨论】:

    标签: multithreading scala future


    【解决方案1】:

    我相信默认情况下scala.Future 将在 Future 创建后立即开始执行,因此dbFetch() 的调用将立即启动连接。由于foldLeft 将所有暂停的A =&gt; Future[B] 转换为实际的Future 对象,我不相信批处理会以您想要的方式发生。

    是的,我相信代码可以正常工作(参见 cmets)。

    另一种方法是让池定义并行度,但这并不总是有效,具体取决于您的执行环境。

    我在使用并行集合进行批处理方面取得了一些成功。例如,如果您创建一个集合,其中元素的数量代表并发活动的数量,您可以使用.par。例如,

    // partition xs into numBatches Set elements, and invoke processBatch on each Set in parallel
    def batch[A,B](xs: Iterable[A], numBatches: Int)
      (processBatch: Set[A] => Set[B]): ParSeq[B] = split(xs,numBatches).par.flatMap(processBatch)
    
    // Split the input iterable into numBatches sub-sets.
    // For example split(Seq(1,2,3,4,5,6), 3) =  Seq(Set(1, 4), Set(2, 5), Set(3, 6))
    def split[A](xs: Iterable[A], numBatches: Int): Seq[Set[A]] = {
        val buffers: Vector[VectorBuilder[A]] = Vector.fill(numBatches)(new VectorBuilder[A]())
        val elems = xs.toIndexedSeq
        for (i <- 0 until elems.length) {
          buffers(i % numBatches) += elems(i)
        }
        buffers.map(_.result.toSet)
    }
    
    

    【讨论】:

    • 感谢您的回答!我认为您的解决方案是一个不错的选择,但是,在我的原始代码中调用 dbFetch 是在 for 理解中发生的,所以这不会阻止 Futures 立即执行吗?如果 Futures 是在 for 理解之外创建的,它们将并行发生。这篇文章给出了一个很好的例子stackoverflow.com/questions/19045936/…
    猜你喜欢
    • 2021-03-05
    • 2021-11-08
    • 2014-12-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-03
    • 2011-06-04
    相关资源
    最近更新 更多