【问题标题】:Scala Actors instead of Java FuturesScala Actors 而不是 Java Futures
【发布时间】:2011-08-10 07:44:05
【问题描述】:

问题:我需要编写一个应用程序来处理数百个文件,每个文件都需要数百兆字节和几秒钟的时间。我使用Executors.newFixedThreadPool() 创建的Future[Report] 对象编写了它,但由于ExecutorService.invokeAll() 返回的List[Future[Report]] 对象占用了每个进程使用的中间内存,因此出现内存不足错误。我通过在计算Report 值(每个Report 只有几百行)而不是在call 方法中进行计算(来自接口@ 987654330@)。

我想尝试使用 Scala Actors 来解决这个问题。我创建了一个类,它接受一系列作业(作业、结果和处理函数的参数化类型)并在可配置数量的Worker 实例(Actor 的子类)中的一个中处理每个作业。代码如下。

问题

  • 我不确定我的处理是 正确。

  • 我不喜欢使用CountDownLatch 来延迟从调度程序返回结果。

  • 我更愿意编写一个更“实用”的调度程序版本,它不会修改jobsQueue 列表或workers 哈希图,也许是借用了 Clojure 的尾递归 loop 结构(I'我在其他 Scala 代码中使用了@tailrec def loop 方法)。

我正在焦急地等待 Philipp Haller 和 Frank Sommers 发布 "Actors in Scala"

代码如下:

package multi_worker

import scala.actors.Actor
import java.util.concurrent.CountDownLatch

object MultiWorker {
  private val megabyte = 1024 * 1024
  private val runtime = Runtime.getRuntime
}

class MultiWorker[A, B](jobs: List[A],
                        actorCount: Int)(process: (A) => B) {
  import MultiWorker._

  sealed abstract class Message

  // Dispatcher -> Worker: Run this job and report results
  case class Process(job: A) extends Message

  // Worker -> Dispatcher: Result of processing
  case class ReportResult(id: Int, result: B) extends Message

  // Worker -> Dispatcher: I need work -- send me a job
  case class SendJob(id: Int) extends Message

  // Worker -> Dispatcher: I have stopped as requested
  case class Stopped(id: Int) extends Message

  // Dispatcher -> Worker: Stop working -- all jobs done
  case class StopWorking extends Message

  /**
   * A simple logger that can be sent text messages that will be written to the
   * console. Used so that messages from the actors do not step on each other.
   */
  object Logger
  extends Actor {
    def act() {
      loop {
        react {
          case text: String => println(text)
          case StopWorking => exit()
        }
      }
    }
  }
  Logger.start()

  /**
   * A worker actor that will process jobs and return results to the
   * dispatcher.
   */
  class Worker(id: Int)
  extends Actor{
    def act() {
      // Ask the dispatcher for an initial job
      dispatcher ! SendJob(id)

      loop {
        react {
          case Process(job) =>
            val startTime = System.nanoTime
            dispatcher ! ReportResult(id, process(job))

            val endTime = System.nanoTime
            val totalMemory = (runtime.totalMemory / megabyte)
            val usedMemory = totalMemory - (runtime.freeMemory / megabyte)
            val message = "Finished job " + job + " in " +
              ((endTime - startTime) / 1000000000.0) +
              " seconds using " + usedMemory +
              "MB out of total " + totalMemory + "MB"
            Logger ! message

            dispatcher ! SendJob(id)

          case StopWorking =>
            dispatcher ! Stopped(id)
            exit()
        }
      }
    }
  }

  val latch = new CountDownLatch(1)
  var res = List.empty[B]

  /**
   * The job dispatcher that sends jobs to the worker until the job queue
   * (jobs: TraversableOnce[A]) is empty. It then tells the workers to
   * stop working and returns the List[B] results to the caller.
   */
  val dispatcher = new Actor {
    def act() {
      var jobQueue = jobs
      var workers = (0 until actorCount).map(id => (id, new Worker(id))).toMap
      workers.values.foreach(_.start())

      loop {
        react {
          case ReportResult(id, result) =>
            res = result :: res
            if (jobQueue.isEmpty && workers.isEmpty) {
              latch.countDown()
              exit()
            }

          case SendJob(id) =>
            if (!jobQueue.isEmpty) {
              workers(id) ! Process(jobQueue.head)
              jobQueue = jobQueue.tail
            }

          case Stopped(id) =>
            workers = workers - id
        }
      }
    }
  }
  dispatcher.start()

  /**
   * Get the results of the processing -- wait for the dispatcher to finish
   * before returning.
   */
  def results: List[B] = {
    latch.await()
    res
  }
}

【问题讨论】:

    标签: scala actor


    【解决方案1】:

    这是我想出的最终版本(感谢 Vasil Remeniuk)。带有// DEBUG 注释的println 语句用于显示进度,main 方法是一个单元测试:

    import scala.actors.Actor
    import scala.actors.Channel
    import scala.actors.Scheduler
    import scala.annotation.tailrec
    
    object MultiWorker {
      private val megabyte = 1024 * 1024
      private val runtime = Runtime.getRuntime
    
      def main(args: Array[String]) {
        val jobs = (0 until 5).map((value: Int) => value).toList
        val multiWorker = new MultiWorker[Int, Int](jobs, 2, { value =>
            Thread.sleep(100)
            println(value)
            value
          })
        println("multiWorker.results: " + multiWorker.results)
        Scheduler.shutdown
      }
    }
    
    class MultiWorker[A, B](jobs: List[A],
                            actorCount: Int,
                            process: (A) => B) {
      import MultiWorker._
    
      sealed abstract class Message
    
      // Dispatcher -> Worker: Run this job and report results
      case class Process(job: A) extends Message
    
      // Worker -> Dispatcher: Result of processing
      case class ReportResult(id: Int, result: B) extends Message
    
      // Worker -> Dispatcher: I need work -- send me a job
      case class SendJob(id: Int) extends Message
    
      // Worker -> Dispatcher: I have stopped as requested
      case class Stopped(id: Int) extends Message
    
      // Dispatcher -> Worker: Stop working -- all jobs done
      case class StopWorking() extends Message
    
      /**
       * A simple logger that can be sent text messages that will be written to the
       * console. Used so that messages from the actors do not step on each other.
       */
      object Logger
      extends Actor {
        def act() {
          loop {
            react {
              case text: String => println(text)
              case StopWorking => exit()
            }
          }
        }
      }
      Logger.start()
    
      /**
       * A worker actor that will process jobs and return results to the
       * dispatcher.
       */
      case class Worker(id: Int)
      extends Actor{
        def act() {
          // Ask the dispatcher for an initial job
          dispatcher ! SendJob(id)
    
          loop {
            react {
              case Process(job) =>
                println("Worker(" + id + "): " + Process(job)) // DEBUG
                val startTime = System.nanoTime
                dispatcher ! ReportResult(id, process(job))
    
                val endTime = System.nanoTime
                val totalMemory = (runtime.totalMemory / megabyte)
                val usedMemory = totalMemory - (runtime.freeMemory / megabyte)
                val message = "Finished job " + job + " in " +
                ((endTime - startTime) / 1000000000.0) +
                " seconds using " + usedMemory +
                "MB out of total " + totalMemory + "MB"
                Logger ! message
    
                dispatcher ! SendJob(id)
    
              case StopWorking() =>
                println("Worker(" + id + "): " + StopWorking()) // DEBUG
                dispatcher ! Stopped(id)
                exit()
            }
          }
        }
      }
    
      val resultsChannel = new Channel[List[B]]
      /**
       * The job dispatcher that sends jobs to the worker until the job queue
       * (jobs: TraversableOnce[A]) is empty. It then tells the workers to
       * stop working and returns the List[B] results to the caller.
       */
      val dispatcher = new Actor {
        def act() {
          @tailrec
          def loop(jobs: List[A],
                   workers: Map[Int, Worker],
                   acc: List[B]) {
            println("dispatcher: loop: jobs: " + jobs + ", workers: " + workers + ", acc: " + acc) // DEBUG
            if (!workers.isEmpty) { // Stop recursion when there are no more workers
              react {
                case ReportResult(id, result) =>
                  println("dispatcher: " + ReportResult(id, result)) // DEBUG
                  loop(jobs, workers, result :: acc)
    
                case SendJob(id) =>
                  println("dispatcher: " + SendJob(id)) // DEBUG
                  if (!jobs.isEmpty) {
                    println("dispatcher: " + "Sending: " + Process(jobs.head) + " to " + id) // DEBUG
                    workers(id) ! Process(jobs.head)
                    loop(jobs.tail, workers, acc)
                  } else {
                    println("dispatcher: " + "Sending: " + StopWorking() + " to " + id) // DEBUG
                    workers(id) ! StopWorking()
                    loop(Nil, workers, acc)
                  }
    
                case Stopped(id) =>
                  println("dispatcher: " + Stopped(id)) // DEBUG
                  loop(jobs, workers - id, acc)
              }
            } else {
              println("dispatcher: " + "jobs: " + jobs + ", workers: " + workers + ", acc: " + acc) // DEBUG
              resultsChannel ! acc
            }
          }
    
          loop(jobs, (0 until actorCount).map(id => (id, new Worker(id).start.asInstanceOf[Worker])).toMap, Nil)
          exit()
        }
      }.start()
    
      /**
       * Get the results of the processing -- wait for the dispatcher to finish
       * before returning.
       */
      def results: List[B] = {
        resultsChannel.receive {
          case results => results
        }
      }
    }
    

    【讨论】:

      【解决方案2】:

      快速浏览后,我会提出以下更新:

      val resultsChannel = new Channel[List[B]] // used instead of countdown latch to get the results
      
      val dispatcher = new Actor {
      
        def act = loop(Nil, (0 to actorCount).map(id =>
            (id, new Worker(id).start.asInstanceOf[Worker])).toMap,
          Nil)
      
        @tailrec
        def loop(jobQueue: List[A], // queue, workers and results are immutable lists, passed recursively through the loop
                 workers: Map[Int, Worker],
                 res: List[B]):Unit = react {
          case ReportResult(id, result) =>
            val results = result :: res
            if (results.size == jobs.size) { // when the processing is finished, sends results to the output channel        
              resultsChannel ! results
            }
            loop(jobQueue, workers, results)
      
          case SendJob(id) =>
            if (!jobQueue.isEmpty) {
              workers(id) ! Process(jobQueue.head)
              loop(jobQueue.tail, workers, res)
            }
      
          case Stopped(id) =>
            loop(jobQueue, workers - id, res)
        }
      
      }
      dispatcher.start()
      
      def results: List[B] = {
        resultsChannel.receive {
          case results => results // synchronously wait for the data in the channel
        }
      }
      

      【讨论】:

      • 可爱的代码。为了便于阅读,我会稍微分解一下——提取方法等——但这个概念真的很好。
      猜你喜欢
      • 2012-01-03
      • 1970-01-01
      • 2020-06-05
      • 2011-06-06
      • 2011-03-19
      • 2012-12-22
      • 2013-02-22
      • 2012-04-24
      • 1970-01-01
      相关资源
      最近更新 更多