【问题标题】:Akka scala Event bus with different classifiers depending on the subscriberAkka scala 事件总线具有不同的分类器,具体取决于订阅者
【发布时间】:2025-11-22 22:55:01
【问题描述】:

我正在研究 Akka EventBus 以检查它是否可以解决我的设计问题之一,但我仍然不知道。 问题如下。

为了简化,我有:

case class Request(requesterId: String, operation: String, header:  RequestHeader)
case class Response(requesterId: String, operation: String, header: ResponseHeader)

我有几个具有不同功能的演员,我希望一些演员订阅Response,取决于requesterId,其他一些取决于operation。 有没有办法通过 EventBus 和分类器轻松实现这一目标?

谢谢, 乔尔

【问题讨论】:

    标签: scala akka event-bus


    【解决方案1】:

    当然,它叫LookupEventBus。您可以通过扩展它来实现自己的总线,并在 classify 方法中提取 requesterId,如下所示:

    class LookupBusImpl extends EventBus with LookupClassification {
      type Event = HasRequesterId // I made up a super type for you here
      type Classifier = String
      type Subscriber = ActorRef
    
      override def classify(event: HasRequesterId): String = event.requesterId
    

    然后您将订阅给定的requesterId,如下所示:

      lookupBus.subscribe(actorRef, "requester-100")
    

    然后这个 Actor 将只接收被分类为 requester-100 的消息。

    【讨论】:

    • 谢谢,但它只回答了我问题的第一部分:我希望另一个演员能够订阅不同的分类器,即operation 字段。基本上,请求者对结果感兴趣,因为......他是请求者,其他一些人是因为他们想了解具体操作。我应该创建多条总线并两次发布事件吗?
    • 在 Akka 中最多可以使用多少个分类器。例如,如果我有 10,000 个不同的分类器。我可以继续将“受保护的 def mapSize = 128”更改为 10000。还是?
    【解决方案2】:

    我同意 Konrad 的观点,即您应该实施新的 LookupClassification 总线来解决您的问题。我认为拥有这些总线的两个独立实例是最简单的,一个按 requesterId 分类,另一个按操作分类。这种方法的一些基本设置工作是:

    //Singleton to hold the instances of each stream type
    object ResponseEventStream{
      val RequestorIdStream = new RequestorIdResponseEventStream
      val OperationStream = new OperationResponseEventStream
    }
    
    //Common functionality for the two different types of streams
    trait ResponseEventStream extends ActorEventBus with LookupClassification{
      import ResponseEventStream._
      type Event = Response
      type Classifier = String  
      protected def mapSize = 128
      protected def publish(resp:Response, subscriber: ActorRef) = {
        if (subscriber.isTerminated) unsubscribe(subscriber)
        else subscriber ! resp
      }  
    }
    
    //Concrete impl that uses requesterId to classify
    class RequestorIdResponseEventStream extends ResponseEventStream{
      protected def classify(resp:Response) = resp.requesterId 
    }
    
    //Concrete impl that uses operation to classify
    class OperationResponseEventStream extends ResponseEventStream{
      protected def classify(resp:Response) = resp.operation 
    }
    
    //Trait to mix into classes that need to publish or subscribe to response events
    //Has helper methods to simplify interaction with the two distinct streams
    trait ResponseEventing{
      import ResponseEventStream._
    
      def publishResponse(resp:Response){
        RequestorIdStream.publish(resp)
        OperationStream.publish(resp)
      }
    
      def subscribeByRequestId(requestId:String, ref:ActorRef){
        RequestorIdStream.subscribe(ref, requestId)
      }
    
      def subscribeByOperartion(op:String, ref:ActorRef){
        OperationStream.subscribe(ref, op)
      }  
    }
    

    然后您只需要将 ResponseEventing 特征混合到需要发布 Response 事件或需要订阅它们的参与者中。发布的 Actor 将调用 publishResponse,需要订阅的 Actor 将调用 subscribeXXX,具体取决于他们感兴趣的分类(requesterId 或 operation)。

    【讨论】:

    • 非常感谢,我会试试这个解决方案!
    最近更新 更多