【问题标题】:Akka Event Bus Tutorial [closed]Akka 事件总线教程 [关闭]
【发布时间】:2013-04-22 11:14:09
【问题描述】:

关于如何在akka中使用事件总线有什么好的教程/解释吗? 我已经阅读了 Akka 文档,但我发现很难理解如何使用事件总线

【问题讨论】:

标签: scala akka event-bus


【解决方案1】:

每个ActorSystem 都有一个EventBus。这个EventBus被称为Event Stream,可以通过调用system.eventStream获得。

ActorSystem 将事件流用于许多事情,包括 logging、发送 Dead LettersCluster Events

您还可以将事件流用于您自己的发布/订阅要求。例如,事件流在测试期间可能很有用。将Test KittestActor 订阅到某些事件的事件流(例如记录事件),您可以expect 他们。当发生某些事情时您不会向另一个参与者发送消息但您仍然需要在测试中期待该事件时,这可能特别有用。

请注意,事件流仅适用于一个ActorSystem。如果您使用在流上发布的远程处理事件,则默认情况下不要跨到远程系统(尽管您可以自己添加该支持)。

如果您不想使用 Event Stream,理论上可以创建一个单独的 EventBus

正在为 Akka 2.2 编写更好的事件总线文档,因此请在 this ticket 完成后再次查看。

【讨论】:

    【解决方案2】:

    不确定是否有任何好的教程,但我可以为您提供一个使用事件流可能会有所帮助的可能用户案例的快速示例。不过,在高层次上,事件流是一种很好的机制,可以满足您的应用程序可能具有的发布/订阅类型要求。假设您有一个用例,您可以在系统中更新用户的余额。余额经常被访问,因此您决定缓存它以获得更好的性能。当余额更新时,您还想检查用户的余额是否超过阈值,如果是,请通过电子邮件发送给他们。您不希望将缓存删除或余额阈值检查直接绑定到主余额更新调用中,因为它们可能很重并且会减慢用户的响应速度。您可以像这样对特定的一组需求进行建模:

    //Message and event classes
    case class UpdateAccountBalance(userId:Long, amount:Long)
    case class BalanceUpdated(userId:Long)
    
    //Actor that performs account updates
    class AccountManager extends Actor{
      val dao = new AccountManagerDao
    
      def receive = {
        case UpdateAccountBalance(userId, amount) =>
          val res = for(result <- dao.updateBalance(userId, amount)) yield{
            context.system.eventStream.publish(BalanceUpdated(userId))
            result                
          }
    
          sender ! res
      }
    }
    
    //Actor that manages a cache of account balance data
    class AccountCacher extends Actor{
      val cache = new AccountCache
    
      override def preStart = {
        context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
      }
    
      def receive = {
        case BalanceUpdated(userId) =>
          cache.remove(userId)
      }
    }
    
    //Actor that checks balance after an update to warn of low balance
    class LowBalanceChecker extends Actor{
      val dao = new LowBalanceDao
    
      override def preStart = {
        context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
      }
    
      def receive = {
        case BalanceUpdated(userId) =>
          for{
            balance <- dao.getBalance(userId)
            theshold <- dao.getBalanceThreshold(userId)
            if (balance < threshold)
          }{
            sendBalanceEmail(userId, balance)
          }
      }
    }
    

    在此示例中,AccountCacherLowBalanceChecker 演员都订阅了 BalanceUpdated 事件的类类型 eventStream。如果此事件是发布到流的事件,则这两个参与者实例都将接收到它。然后,在AccountManager 中,当余额更新成功时,它会为用户引发BalanceUpdated 事件。发生这种情况时,该消息同时被传送到AccountCacherLowBalanceChecker 的邮箱,从而导致余额从缓存中删除,并检查帐户阈值,并可能发送一封电子邮件。

    现在,您可以直接将 tell (!) 调用放入 AccountManager 以直接与这两个参与者进行通信,但有人可能会争辩说这可能过于紧密地耦合了平衡更新的这两个“副作用”,并且这些类型的细节不一定属于AccountManager。如果您的情况可能会导致一些额外的事情(检查、更新等)需要纯粹作为副作用发生(不是核心业务流程本身的一部分),那么事件流可能是一个好方法解耦正在引发的事件以及谁可能需要对该事件做出反应。

    【讨论】:

    • 谢谢。只是几个问题:1)您是否只是通过订阅一个事件总线来创建事件总线?如何“摧毁”事件总线? 2)是否有负责事件总线的特定演员? 3)我注意到你没有声明分类器,是否有一个默认分类器被选中?
    • ActorSystem 已经为它创建了一个事件总线;无需自己创建。当ActorSystem 创建总线时,我假设根监护人负责总线。我不确定您所说的问题 3 是什么意思;你能再解释一下吗?
    • 我刚刚再次阅读了doc,似乎我误解了我自己的问题。我想说的是,上面示例中的参与者被订阅以接收特定消息(BalanceUpdated)。我将如何为演员订阅可以发送各种消息的主题
    • 当您说主题时,您是在谈论 JMS 或 AMQP 之类的东西吗?
    • 这个答案非常有用。这与我在阅读 Akka 文档后开始写的内容也有很大不同 - 所以尽管这个问题已经结束,但我认为这是一个很好的问题。
    猜你喜欢
    • 2011-10-26
    • 1970-01-01
    • 2010-09-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-10-21
    • 1970-01-01
    相关资源
    最近更新 更多