【问题标题】:Get or create child actor by ID通过 ID 获取或创建子actor
【发布时间】:2015-11-20 15:22:23
【问题描述】:

我的系统中有两个演员。谈话者和谈话。对话由两个谈话者组成(到目前为止)。当说话者想要加入对话时,我应该检查对话是否存在(另一个说话者创建了它),如果不存在,则创建它。我的 Talker 演员的方法中有这段代码:

  def getOrCreateConversation(conversationId: UUID): ActorRef = {

    // @TODO try to get conversation actor by conversationId
    context.actorSelection("user/conversation/" + conversationId.toString)

    // @TODO if it not exists... create it
    context.actorOf(Conversation.props(conversationId), conversationId.toString)
  }

如您所见,当我使用actorOf 创建我的对话演员时,我将conversationId 作为第二个参数传递。我这样做是为了方便搜索这个演员...这是正确的方法吗?

谢谢

已编辑

感谢@Arne,我终于做到了:

class ConversationRouter extends Actor with ActorLogging {
  def receive = {
    case ConversationEnv(conversationId, msg) =>
      val conversation = findConversation(conversationId) match {
        case None    => createNewConversation(conversationId)
        case Some(x) => x
      }
      conversation forward msg
  }

  def findConversation(conversationId: UUID): Option[ActorRef] = context.child(conversationId.toString)

  def createNewConversation(conversationId: UUID): ActorRef = {
    context.actorOf(Conversation.props(conversationId), conversationId.toString)
  }
}

还有测试:

class ConversationRouterSpec extends ChatUnitTestCase("ConversationRouterSpec") {

  trait ConversationRouterSpecHelper {
    val conversationId = UUID.randomUUID()

    var newConversationCreated = false

    def conversationRouterWithConversation(existingConversation: Option[ActorRef]) = {
      val conversationRouterRef = TestActorRef(new ConversationRouter {
        override def findConversation(conversationId: UUID) = existingConversation

        override def createNewConversation(conversationId: UUID) = {
          newConversationCreated = true
          TestProbe().ref
        }
      })
      conversationRouterRef
    }
  }

  "ConversationRouter" should {
    "create a new conversation when a talker join it" in new ConversationRouterSpecHelper {
      val nonExistingConversationOption = None
      val conversationRouterRef = conversationRouterWithConversation(nonExistingConversationOption)

      conversationRouterRef ! ConversationEnv(conversationId, Join(conversationId))

      newConversationCreated should be(right = true)
    }

    "not create a new conversation if it already exists" in new ConversationRouterSpecHelper {
      val existingConversation = Option(TestProbe().ref)
      val conversationRouterRef = conversationRouterWithConversation(existingConversation)

      conversationRouterRef ! ConversationEnv(conversationId, Join(conversationId))

      newConversationCreated should be(right = false)
    }
  }
}

【问题讨论】:

    标签: scala akka


    【解决方案1】:

    无法同步确定参与者的存在。所以你有几个选择。前两个在本质上更具概念性,用于说明执行异步查找,但我提供更多关于参与者异步性质的参考。第三种可能是正确的做事方式:

    1.使函数返回Future[ActorRef]

    def getOrCreateConversation(conversationId: UUID): Unit {
       context.actorSelection(s"user/conversation/$conversationId")
         .resolveOne()
         .recover { case _:Exception =>
            context.actorOf(Conversation.props(conversationId),conversationId.toString)
          }
    }
    

    2。将其设为Unit 并将ActorRef 发送回您当前的演员

    与上面几乎相同,但现在我们将future 传回当前actor,以便可以在调用actor 的receive 循环的上下文中处理已解析的actor:

    def getOrCreateConversation(conversationId: UUID): Unit {
       context.actorSelection(s"user/conversation/$conversationId")
         .resolveOne()
         .recover { case _:Exception =>
            context.actorOf(Conversation.props(conversationId),conversationId.toString)
          }.pipeTo(self)
    }
    

    3。创建一个路由器actor,您将Id'ed 消息发送到它,它会创建/解析子节点并转发消息

    我说这可能是正确的方法,因为您的目标似乎是在特定命名路径上进行廉价查找。您给出的示例假设该函数始终从路径 /user/conversation 处的参与者内部调用,否则 context.actorOf 将不会在 /user/conversation/{id}/ 处创建子级。

    也就是说,您手上有一个路由器模式,并且您创建的子节点在其子集合中的路由器已经知道。此模式假设您在任何对话消息周围都有一个信封,如下所示:

    case class ConversationEnv(id: UUID, msg: Any)
    

    现在所有对话消息都被发送到路由器而不是直接发送到对话子节点。路由器现在可以在其子集合中查找子节点:

    def receive = {
      case ConversationEnv(id,msg) =>
        val conversation = context.child(id.toString) match {
          case None => context.actorOf(Conversation.props(id),id.toString)
          case Some(x) => x
        }
        conversation forward msg
    }
    

    额外的好处是你的路由器也是对话的监督者,所以如果对话的孩子死了,它可以处理它。不将孩子ActorRef 暴露给外界还有一个好处,就是你可以让它在空闲时死掉,并在下一次收到消息时重新创建它,等等。

    【讨论】:

    • 谢谢阿恩!阅读您的答案,我学到的知识比搜索该主题的一天要多!
    • 还有一个问题阿恩。现在我没有做集群,所以查找很容易,一切都在同一台机器上。当我这样做时,它会影响路由器执行的查找过程吗?还是在位置透明的情况下,一切都会神奇地正常工作?
    • 简短的回答是肯定的,但可能不是你想要的。一方面,默认情况下,路由器的子节点都将与路由器本身位于同一节点上(这可以更改),另一方面,您需要一种方法来找到带有路由器的节点(现在位于{address}:/user/conversation.在集群中,最好通过在集群分片 (doc.akka.io/docs/akka/snapshot/scala/cluster-sharding.html) 之上实现路由器来为您服务
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-31
    • 2015-02-12
    • 2011-07-28
    相关资源
    最近更新 更多