【问题标题】:What is the correct way to implement Producer Consumer in scala在scala中实现生产者消费者的正确方法是什么
【发布时间】:2014-10-10 06:40:38
【问题描述】:

我尝试在不使用队列的情况下在 scala 中实现生产者消费者程序。因为我觉得Actor已经实现了“邮件队列”什么的,再写代码就显得多余了。

我试图纯粹用Actor编写程序。 下面是一个多生产者多消费者程序。 制片人睡了一会儿,模拟做某事。消费者根本不睡觉。

但是,如果我不添加主管角色来监控消费者,以及使用“等待”(代码中的主管类)的 Promise 对象,我不知道如何关闭程序

有没有办法摆脱它们?

import akka.actor.Actor.Receive
import akka.actor._
import akka.routing._;
import akka.util._

import scala.concurrent.{Await, Promise}
import scala.concurrent.duration._

class Producer(val pool:ActorRef)(val name:String) extends Actor {

  def receive = {
    case _ =>
      while (true) {
        val sleepTime = scala.util.Random.nextInt(1000)
        Thread.sleep(sleepTime)
        println("Producer %s send food" format name)
        pool ! name
      }
  }
}

class Consumer(supervisor : ActorRef)(val name:String) extends Actor {

  var counter = 0

  def receive = {
    case s => 
      counter += 1
      println("%s eat food produced by %s" format (name,s))

      if (counter >= 10) {
        println("%s is full" format name)

        context.stop(self)
        supervisor ! 1
      }
  }
}

class Supervisor(p:Promise[String]) extends Actor {

  var r = 3

  def receive = {
    case _ =>
      r -= 1
      if (0 == r) {
        println("All consumer stopped")
        context.stop(self)
        p success ("Good")
      }
  }

}

object Try3 {

  def work(): Unit = {
    val system = ActorSystem("sys1")
    val nProducer = 5;
    val nConsumer = 3;
    val p = Promise[String]
    val supervisor = system.actorOf(Props(new Supervisor(p)));
    val arrConsumer = for ( i <-  1 to nConsumer) yield system.actorOf( Props( new Consumer(supervisor)( "Consumer %d" format (i) ) ) )
    val poolConsumer = system.actorOf(Props.empty.withRouter( RoundRobinRouter(arrConsumer) ))
    val arrProducer = for ( i <-  1 to nProducer) yield system.actorOf( Props( new Producer(poolConsumer)( "Producer %d" format (i) ) ) )

    arrProducer foreach (_ ! "start")

    Await.result(p.future,Duration.Inf)
    println("great!")
    system.shutdown
  }

  def main(args:Array[String]): Unit = {
    work()
  }
}

接收函数Producer类有一个问题,它不会被关闭,因为它是没有破坏条件的。

我能想到的唯一方法是“向生产者本身发送消息”。 我想知道这是实现这种请求的正常方式吗?

这是修改后的代码:

class Producer(val pool:ActorRef)(val name:String) extends Actor {

  //  original implementation:
  //  def receive = {
  //    case _ =>
  //    while (true){
  //      val sleepTime = scala.util.Random.nextInt(1000)
  //      Thread.sleep(sleepTime)
  //      println("Producer %s send food" format name)
  //      pool ! name
  //    }
  //  }

  case object Loop;

  def receive = {
    case _ =>
      val sleepTime = scala.util.Random.nextInt(1000)
      Thread.sleep(sleepTime)
      println("Producer %s send food" format name)
      pool ! name
      self ! Loop   //send message to itself
  }
}

不管我的实现如何,在 scala 中使用 Actor 或 Future/Promise 实现生产者消费者程序的正确方法是什么?

【问题讨论】:

    标签: scala akka actor producer-consumer


    【解决方案1】:

    你不应该阻塞(在你的情况下是 Thread.sleep,while 循环)在一个演员内部。在actor内部阻塞会占用所有actor之间使用的线程池中的一个线程。即使是像你这样的少量 Producer 也会使 ActorSystem 中的所有 actor 被剥夺线程并使它们无法使用。

    改为使用Scheduler 在您的 Producer 中安排定期发送消息。

    override def preStart(): Unit = {
      import scala.concurrent.duration._
      import context.dispatcher
      context.system.scheduler.schedule(
        initialDelay = 0.seconds,
        interval = 1.second,
        receiver = pool,
        message = name
      )
    }
    

    【讨论】:

    • 谢谢@Martynas。你解决了我的“循环”问题。我仍在为 Producer-Consumer 的优雅实现寻找答案。
    【解决方案2】:

    你如何看待实现TerminatorActor :)

    object Terminator {
      case class WatchMe(ref: ActorRef)
    }
    class Terminator extends Actor { 
       var consumers: Map[ActorRef, ActorRef] = Map()
    
       def receive = { 
          case WatchMe(ref) => {
            consumers += ref -> ref
            context.watch(ref)
          }
          case Terminated(ref) => {
             context.unwatch(ref)
             consumers.get(ref).foreach { ref -> ref ! PoisonPill } 
             consumers -= ref
             //If all consumers are dead stop.self and delegate NoConsumers message higher in hierarchy
             if(consumers.size == 0) { 
               delegate()
               context.stop(self)
             }
          }
       }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-10-23
      • 2018-08-26
      • 2011-03-17
      • 1970-01-01
      • 1970-01-01
      • 2023-03-31
      • 2010-10-18
      相关资源
      最近更新 更多