【发布时间】:2011-08-10 07:44:05
【问题描述】:
问题:我需要编写一个应用程序来处理数百个文件,每个文件都需要数百兆字节和几秒钟的时间。我使用Executors.newFixedThreadPool() 创建的Future[Report] 对象编写了它,但由于ExecutorService.invokeAll() 返回的List[Future[Report]] 对象占用了每个进程使用的中间内存,因此出现内存不足错误。我通过在计算Report 值(每个Report 只有几百行)而不是在call 方法中进行计算(来自接口@ 987654330@)。
我想尝试使用 Scala Actors 来解决这个问题。我创建了一个类,它接受一系列作业(作业、结果和处理函数的参数化类型)并在可配置数量的Worker 实例(Actor 的子类)中的一个中处理每个作业。代码如下。
问题:
我不确定我的处理是 正确。
我不喜欢使用
CountDownLatch来延迟从调度程序返回结果。我更愿意编写一个更“实用”的调度程序版本,它不会修改
jobsQueue列表或workers哈希图,也许是借用了 Clojure 的尾递归loop结构(I'我在其他 Scala 代码中使用了@tailrec def loop方法)。
我正在焦急地等待 Philipp Haller 和 Frank Sommers 发布 "Actors in Scala"。
代码如下:
package multi_worker
import scala.actors.Actor
import java.util.concurrent.CountDownLatch
object MultiWorker {
private val megabyte = 1024 * 1024
private val runtime = Runtime.getRuntime
}
class MultiWorker[A, B](jobs: List[A],
actorCount: Int)(process: (A) => B) {
import MultiWorker._
sealed abstract class Message
// Dispatcher -> Worker: Run this job and report results
case class Process(job: A) extends Message
// Worker -> Dispatcher: Result of processing
case class ReportResult(id: Int, result: B) extends Message
// Worker -> Dispatcher: I need work -- send me a job
case class SendJob(id: Int) extends Message
// Worker -> Dispatcher: I have stopped as requested
case class Stopped(id: Int) extends Message
// Dispatcher -> Worker: Stop working -- all jobs done
case class StopWorking extends Message
/**
* A simple logger that can be sent text messages that will be written to the
* console. Used so that messages from the actors do not step on each other.
*/
object Logger
extends Actor {
def act() {
loop {
react {
case text: String => println(text)
case StopWorking => exit()
}
}
}
}
Logger.start()
/**
* A worker actor that will process jobs and return results to the
* dispatcher.
*/
class Worker(id: Int)
extends Actor{
def act() {
// Ask the dispatcher for an initial job
dispatcher ! SendJob(id)
loop {
react {
case Process(job) =>
val startTime = System.nanoTime
dispatcher ! ReportResult(id, process(job))
val endTime = System.nanoTime
val totalMemory = (runtime.totalMemory / megabyte)
val usedMemory = totalMemory - (runtime.freeMemory / megabyte)
val message = "Finished job " + job + " in " +
((endTime - startTime) / 1000000000.0) +
" seconds using " + usedMemory +
"MB out of total " + totalMemory + "MB"
Logger ! message
dispatcher ! SendJob(id)
case StopWorking =>
dispatcher ! Stopped(id)
exit()
}
}
}
}
val latch = new CountDownLatch(1)
var res = List.empty[B]
/**
* The job dispatcher that sends jobs to the worker until the job queue
* (jobs: TraversableOnce[A]) is empty. It then tells the workers to
* stop working and returns the List[B] results to the caller.
*/
val dispatcher = new Actor {
def act() {
var jobQueue = jobs
var workers = (0 until actorCount).map(id => (id, new Worker(id))).toMap
workers.values.foreach(_.start())
loop {
react {
case ReportResult(id, result) =>
res = result :: res
if (jobQueue.isEmpty && workers.isEmpty) {
latch.countDown()
exit()
}
case SendJob(id) =>
if (!jobQueue.isEmpty) {
workers(id) ! Process(jobQueue.head)
jobQueue = jobQueue.tail
}
case Stopped(id) =>
workers = workers - id
}
}
}
}
dispatcher.start()
/**
* Get the results of the processing -- wait for the dispatcher to finish
* before returning.
*/
def results: List[B] = {
latch.await()
res
}
}
【问题讨论】: