【问题标题】:Akka actor - messaging neighboursAkka 演员 - 消息邻居
【发布时间】:2018-09-29 10:25:45
【问题描述】:

考虑我在同一个演员系统中有四个演员(1,2,3,4)。每个演员只能与尚未向其发送消息的邻居发送消息(即 1 只能发送给 2 和4. 2 和 4 也只能发送给 3,因为他们的邻居 1 已经发送了消息)。当一个演员从它的邻居那里收到消息时,它会打印它的名字并且系统停止。我能够部分实现。但这里的问题是同时两个演员从他们的邻居那里得到消息并停止。例如,如果我从 1 开始进程,1 向 4 和 2,2 向 3 和 4 向 3 发送消息,所以理论上应该打印 3,但我会打印 2 和 3。请建议可以做什么。以下是我的示例逻辑。

object Main extends App {

  //creating a actor system
  val actorSystem = ActorSystem("System")
  //creating four actor instances with id as 1,2,3,4
  for (i <- 1 to 4) {
    actorSystem.actorOf(CircularActor.props(4), "" + i)
  }
  //initiating message to actor 1
  actorSystem.actorSelection(s"/user/1") ! "hello from x"
}


class CircularActor(n: Int) extends Actor {

  //variable to keep a track whether the actor received two meesages(i.e.from both neighbours)
  var noOfMessagesReceived = 0

  //generic method to send message using actorPath
  def messageNeighbour(path:String){
    context.actorSelection(path) ! "hello from x"
  }

  override def receive: Receive = {

    case "hello from x" =>
      noOfMessagesReceived += 1
      if (noOfMessagesReceived == 2) {
        println(s"The actor that received both messages is ${self.path.name}")
        context.system.terminate()
      }
      else {
        //Figures out id of sender
        val pathValue = sender().path.name
        //Gets its own name
        val ownId = self.path.name.toInt
        //Finds out the previous neighbor
        val prev = if (ownId - 1 == 0) n else ownId - 1
        //Finds next neighbour
        val next = if (ownId == n) 1 else ownId + 1

        //If the message is from deadletter, then this is the initiator actor
        if (pathValue == "deadLetters") {
          messageNeighbour(s"/user/$prev")
          messageNeighbour(s"/user/$next")
        }
        //If the message is from its next neighbour,send it to previous
        else if (pathValue.toInt == next) {
          //introducing random delay
          Thread.sleep(1 + Random.nextInt(100))
          messageNeighbour(s"/user/$prev")
        }
        //If none of above,then send it to previous.
        else {
          Thread.sleep(1 + Random.nextInt(100))
          messageNeighbour(s"/user/$next")
        }
  }
}

object CircularActor {

  def props(n: Int): Props = Props(new CircularActor(n))
}

【问题讨论】:

    标签: scala akka actor


    【解决方案1】:

    为了实现所需的行为,我们需要一个参与者来跟踪接收到消息的参与者。

    型号:

    abstract class Direction
    
    object Left extends Direction
    
    object Right extends Direction
    
    object StartPoint extends Direction
    
    object Process
    

    簿记员演员

    class BookKeeperActor extends Actor {
    var visitedActorRefs: mutable.HashSet[String] = mutable.HashSet.empty[String]
    
    override def receive: Receive = {
      case Process =>
        if (visitedActorRefs.contains(sender().path.toString)) {
          context.stop(self)
          context.system.terminate()
          println(s"The actor that received both messages is ${sender().path.toString}")
        }
        else
          visitedActorRefs.add(sender().path.toString)
    }
    
    }
    

    圆形演员:

    class CircularActor(n: Int, bookKeeper: ActorRef) extends Actor {
    
    
    //generic method to send message using actorPath
    def messageNeighbour(path: String, direction: Direction) {
      context.actorSelection(path) ! ("hello from x", direction)
    }
    
    override def receive: Receive = {
    
    
      case ("hello from x", direction: Direction) =>
        bookKeeper ! Process
        //Figures out id of sender
        val pathValue = sender().path.name
        //Gets its own name
        val ownId = self.path.name.toInt
        //Finds out the previous neighbor
        val prev = if (ownId - 1 == 0) n else ownId - 1
        //Finds next neighbour
        val next = if (ownId == n) 1 else ownId + 1
    
        Thread.sleep(1 + Random.nextInt(100))
    
        direction match {
          case StartPoint =>
            messageNeighbour(s"/user/$prev", Left)
            messageNeighbour(s"/user/$next", Right)
          case Left => messageNeighbour(s"/user/$prev", Left)
          case Right => messageNeighbour(s"/user/$next", Right)
        }
    
    }
    
    }
    
    
    object CircularActor {
    
    def props(n: Int, bookeeper: ActorRef): Props = Props(new CircularActor(n, bookeeper))
    }
    

    主应用-

    object Main extends App {
    
    val actorSystem = ActorSystem("System")
    //creating four actor instances with id as 1,2,3,4
    val bookKeeperActor = actorSystem.actorOf(Props(new BookKeeperActor))
    for (i <- 1 to 4) {
      val ac = actorSystem.actorOf(CircularActor.props(4, bookKeeperActor), "" + i)
    }
    //initiating message to actor 1
    actorSystem.actorSelection(s"/user/1") ! ("hello from x", StartPoint)
    }
    

    【讨论】:

      【解决方案2】:

      问题似乎是您假设消息按发送顺序进行处理,但情况并非如此。消息传递是异步的,唯一的保证是消息按照它们到达的顺序对任何给定的actor进行处理。未定义不同参与者处理消息的顺序。

      因此在您的系统中,消息可以按此顺序处理

      <dead> -> 1
           1 -> 2,4
           4 -> 3
           3 -> 2
           2 -> 3
           2 -> <terminate>
      

      如您所见,演员2 在任何其他演员之前处理两条消息。

      不清楚可以做什么,因为不清楚你想要实现什么。但是用这样的循环构建参与者系统是危险的。一般来说,Actor 系统应该组织为请求树或 DAG,并将回复发送给请求的 Actor。

      【讨论】:

      • 没有具体的用例。我只想找出谁是第一个从两个邻居那里收到消息的人。这可以实现吗?
      • @SugaRaj 您的代码确实做到了这一点,但理论上它可能是第一个收到两条消息的任何参与者。我不确定为什么会打印两行,可能是调用 terminate() 时演员系统没有立即关闭。
      • 我仍在试图弄清楚为什么控制台中有两个打印语句。如果您发现任何变化,请告诉我
      猜你喜欢
      • 2014-12-02
      • 2013-11-29
      • 2012-10-09
      • 2015-03-28
      • 1970-01-01
      • 1970-01-01
      • 2011-11-17
      • 2021-01-11
      • 1970-01-01
      相关资源
      最近更新 更多