【问题标题】:Memory consumption of a parallel Scala Stream并行 Scala Stream 的内存消耗
【发布时间】:2012-04-06 23:29:56
【问题描述】:

我编写了一个 Scala (2.9.1-1) 应用程序,它需要处理来自数据库查询的数百万行。我正在使用我的previous questions 之一的答案中显示的技术将ResultSet 转换为Stream

class Record(...)

val resultSet = statement.executeQuery(...)

new Iterator[Record] {
  def hasNext = resultSet.next()
  def next = new Record(resultSet.getString(1), resultSet.getInt(2), ...)
}.toStream.foreach { record => ... }

而且效果很好。

由于 foreach 闭包的主体是 CPU 密集型的,并且作为函数式编程实用性的证明,如果我在 foreach 之前添加一个 .par,则闭包将并行运行而没有其他闭包努力,除了确保闭包的主体是线程安全的(它以函数式风格编写,除了打印到线程安全日志之外没有可变数据)。

但是,我担心内存消耗。 .par 是导致整个结果集加载到 RAM 中,还是并行操作仅加载与活动线程一样多的行?我已经为 JVM 分配了 4G(64 位,-Xmx4g),但将来我会在更多行上运行它,并担心我最终会出现内存不足。

有没有更好的模式来以函数方式进行这种并行处理?我一直在向我的同事展示这个应用程序,作为函数式编程和多核机器价值的一个例子。

【问题讨论】:

  • 只是好奇。您使用的是什么 DBMS,以及查询它的 Scala DB API 是什么?
  • 我正在使用 Microsoft (msdn.microsoft.com/en-us/sqlserver/aa937724) 的 JDBC 驱动程序访问在 Windows Server 2008 R2 上运行的 Microsoft SQL Server 2012 数据库。

标签: scala memory-management parallel-processing


【解决方案1】:

新的akka stream 库是您正在寻找的解决方案:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Sink}

def iterFromQuery() : Iterator[Record] = {
  val resultSet = statement.executeQuery(...)
  new Iterator[Record] {
    def hasNext = resultSet.next()
    def next = new Record(...)
  }
}

def cpuIntensiveFunction(record : Record) = {
...
}

implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val execContext = actorSystem.dispatcher

val poolSize = 10 //number of Records in memory at once

val stream = 
  Source(iterFromQuery).runWith(Sink.foreachParallel(poolSize)(cpuIntensiveFunction))

stream onComplete {_ => actorSystem.shutdown()}

【讨论】:

    【解决方案2】:

    如果您查看scaladoc of Stream,您会注意到par 的定义类是Parallelizable 特征...并且,如果您查看source code of this trait,您会注意到它需要每个原始集合中的元素并将它们放入组合器中,因此,您会将每一行加载到ParSeq

      def par: ParRepr = {
        val cb = parCombiner
        for (x <- seq) cb += x
        cb.result
      }
    
      /** The default `par` implementation uses the combiner provided by this method
       *  to create a new parallel collection.
       *
       *  @return  a combiner for the parallel collection of type `ParRepr`
       */
      protected[this] def parCombiner: Combiner[A, ParRepr]
    

    一个可能的解决方案是显式地并行化您的计算,这要归功于演员。例如,您可以查看 akka 文档中的 this example,这可能对您的上下文有所帮助。

    【讨论】:

    • 我害怕那个。我想过启动一组线程,然后从(同步)结果集中提取每个行,但这听起来不是一个非常实用的解决方案。
    • 让参与者包装查询并使用您指示拉入块的 Resizer 生成路由器。
    猜你喜欢
    • 2016-08-10
    • 2014-01-01
    • 1970-01-01
    • 2010-10-12
    • 1970-01-01
    • 2010-10-27
    • 2011-12-13
    • 2011-10-03
    • 2012-11-24
    相关资源
    最近更新 更多