【问题标题】:Handling an infinite number of messages (akka) [closed]处理无限数量的消息(akka)[关闭]
【发布时间】:2013-12-14 16:55:25
【问题描述】:

我必须编写一个代码,使演员 A 产生一个演员 B 消耗的无限数字流。演员 A 输出序列:x、f(x)、g(f(x)) 等,其中如果 x 为 0,则 f(x) = 10,否则为 3x,其中 g(x) 为 x/2。即:

输出:x =0, f(x)=10, g(f(x)=5 (3 个消息) 那么接下来的 3 个消息应该是 f(g(f(x)) , g(f(g( f(x))) , f(g(f(g(f(x)))) 和它们的值...其中内部函数每次变为 x 以计算相邻结果的结果

Actor B 一次处理数字 3,它应该在同一行打印每个三元组以及 3 个数字的平均值。

值 (0) 从 main 方法传递给 ActorA。

我的尝试:

import akka.actor._

class ActorA(processB:ActorRef) extends Actor with ActorLogging{

  def f(x : Int) = if(x == 0) 10 else 3 * x
  def g(x : Int) = x / 2

  def receive = {
   case 0 =>
      val x = 0
      //processB !  (x)
     // processB ! (f(x))
    // processB ! (g(f(x)))
      println( (f(x)))
      println((g(f(x))))
    /*case Stop =>
        Console.println("Stop")
       processB ! Stop
       context stop self    */
   }
}

class ActorB extends Actor with ActorLogging{

 def receive = {
        case ActorA =>


        case Stop =>
          Console.println("Stop")
          exit()    
 }
}

case object ActorA
case object ActorB
case object Stop

object messages {
  def main(args: Array[String]) :Unit = {
    val system = ActorSystem("actors")
    val processB = system.actorOf(Props[ActorB])  
    val actorA = system.actorOf(Props(new ActorA(processB)))   
    actorA ! 0
  }
}

如何产生无限数量的消息,我可以一次处理 3 条消息吗?谢谢

【问题讨论】:

    标签: scala message akka actor messages


    【解决方案1】:

    要获得无限序列,您可以使用Stream

    Derek Wyatt 有一篇很好的博客文章介绍了它们以及如何生成斐波那契数:

    http://www.derekwyatt.org/2011/07/29/understanding-scala-streams-through-fibonacci/

    您可以对您的序列使用相同的基本原则,如果我理解正确的话,就是在流中的前一个值上交替应用 f 和 g 函数。

    你可以这样写:

    lazy val stream: Stream[Int] = x #:: stream.zipWithIndex.map { 
       case (p,i) => if (i%2 == 0) f(p) else g(p) 
    }
    

    然后您可以使用 grouped 将流分成 3 个块, 在这里,我已经这样做了,然后为了方便起见,将生成的 Stream[Int](每个大小为 3)转换为一个元组:

    val chunks: Iterator[(Int,Int,Int)] = stream.grouped(3).map { s => 
       (s.head, s.tail.head, s.tail.tail.head) 
    }
    

    然后,您可以随意使用它,如果您愿意,可以将元组发送给其他参与者。

    另一方面,您可以按如下方式匹配该元组:

    case (a:Int, b:Int, c:Int) => ...
    

    【讨论】:

      【解决方案2】:

      演员的收件箱会被填满,并且没有固有的背压告诉制片人等待。见:How to use Akka BoundedMailBox to throttle a producer

      一般来说,您可能希望B 明确地向A 发送请求数据的消息。

      【讨论】:

        猜你喜欢
        • 2016-09-08
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-03-25
        • 1970-01-01
        • 2019-11-30
        • 2021-01-11
        • 1970-01-01
        相关资源
        最近更新 更多