【问题标题】:actor model: one actor requires information from another actor演员模型:一个演员需要另一个演员的信息
【发布时间】:2023-08-24 23:19:01
【问题描述】:

关于actor模型我不明白的是这个。 假设你有两个演员。他们从各种来源收集和管理数据。这些来源通过他们的收件箱/邮箱/队列与参与者交互。例如,参与者 A 收集信号,而参与者 B 管理有关整个系统的信息。

在某些时候,演员 A 必须处理其数据。在处理过程中,它不能做太多其他事情。例如,无法处理新信号。作为数据处理的一部分,参与者 A 需要参与者 B 的信息才能继续。

在伪代码中:

functionOfActorA()
{
  internalQueue {
   ...doing stuff with our data
   info = actor_B.getInfo() -> what should happen here?
   ...doing stuf with our data and the obtained info
   }
}

getInfo()//function of actor B
{
  internalQueue {
    ...prepare requested info
    ... -> what should happen here?
  }
} 

如果两个参与者都应该在自己的队列或线程上独立操作,那么如何根据参与者的请求从一个参与者那里获取信息?

【问题讨论】:

    标签: language-agnostic actor


    【解决方案1】:

    Actor A 可以通过向 Actor B 发送请求来请求信息,但响应将在消息中返回,Actor A 将在未来某个时间收到该消息。有两种方法可以保存此信息:

    1. 参与者 A 可以在内部存储信息以等待该响应。

    2. Actor A 可以将信息附加到它发送给 Actor B 的消息中,然后 Actor B 返回该上下文(否则它会忽略该上下文)。

    当Actor A从Actor B那里得到信息后,就可以继续处理了。

    这是第一种实现方式的示例,使用 Thespian Python 演员库 (http://thespianpy.com):

    class ActorA(Actor):
    
        def __init__(self, *args, **kw):
            super(ActorA, self).__init__(args, kw)
            self.datalist = []
    
        def receiveMessage(self, msg, sender):
    
            if isinstance(msg, ActorAddress):
                self.actorB = msg
    
            elif isinstance(msg, WorkRequest):
                x = got_some_data(msg)
                self.datalist.append(x)
                self.send(self.actorB, NeedInfo())
    
            elif isinstance(msg, Info):
                x = self.datalist.pop()
                process_data(x, msg)
    
    class ActorB(Actor):
    
        def receiveMessage(self, msg, sender):
    
            if isinstance(msg, NeedInfo):
                i = get_info(msg)
                self.send(sender, i)
    

    上面显示了将工作在内部存储到 actor 以供稍后继续的基本功能。有几个注意事项:

    • 如果要存储多个数据项,ActorA 需要有某种方法来确定来自 ActorB 的信息适用于哪个项目。

    • ActorA 需要处理它还不知道 ActorB 地址的情况。

    • ActorA 可能应该使用某种超时来避免将工作永远保留在其内部列表中(如果 ActorB 从不响应)。

    • 如果 ActorA 退出或死亡,它应该在退出之前对仍在 datalist 上的项目执行适当的操作。

    下面是第二种实现方式对应的简单示例:

    class ActorA(Actor):
    
        def receiveMessage(self, msg, sender):
    
            if isinstance(msg, ActorAddress):
                self.actorB = msg
    
            elif isinstance(msg, WorkRequest):
                x = got_some_data(msg)
                self.send(self.actorB, NeedInfo(x))
    
            elif isinstance(msg, Info):
                x = msg.orig_request.data
                process_data(x, msg)
    
    class ActorB(Actor):
    
        def receiveMessage(self, msg, sender):
    
            if isinstance(msg, NeedInfo):
                i = getinfo(msg)
                i.orig_request = msg
                self.send(sender, i)
    

    在第二个示例中,创建 NeedInfo 的调用将数据存储在 NeedInfo 对象的 .data 字段中,并且 ActorB 传回的 Info 附加了原始的 NeedInfo 消息。这种风格的注意事项是:

    • 无需将原始 WorkRequest 消息独立关联到相应的 Info,因为它们是相互关联的。

    • ActorA 更多地依赖于 ActorB 的实现,以期望 ActorB 将原始消息附加到其响应中,以允许 ActorA 恢复此上下文。

    • ActorA 仍然需要处理它还不知道 ActorB 地址的情况。

    • 如果 ActorA 退出或死亡,它不会记录此未完成的工作以在清理之前采取任何操作(尽管死信处理程序可以执行此角色)。

    上述示例相当简单,其他 Actor 模型库实现会有一些变化,但这些通用技术应该广泛适用,无论库/语言如何。这两种实现风格都遵循以下通用 Actor 指南:

    1. 收到消息后,做可能的工作

    2. 根据需要更新内部状态以处理下一条消息

    3. 从消息处理程序中退出

    虽然可以编写一个执行阻塞操作的 Actor,但该操作会干扰 Actor 的响应能力和处理其他消息的能力(正如您正确指出的那样),因此尽可能使用消息驱动的延续而不是阻塞呼叫。

    【讨论】: