【发布时间】:2013-04-22 11:14:09
【问题描述】:
关于如何在akka中使用事件总线有什么好的教程/解释吗? 我已经阅读了 Akka 文档,但我发现很难理解如何使用事件总线
【问题讨论】:
-
没有建设性?我还能在哪里找到此类问题的答案?
关于如何在akka中使用事件总线有什么好的教程/解释吗? 我已经阅读了 Akka 文档,但我发现很难理解如何使用事件总线
【问题讨论】:
每个ActorSystem 都有一个EventBus。这个EventBus被称为Event Stream,可以通过调用system.eventStream获得。
ActorSystem 将事件流用于许多事情,包括 logging、发送 Dead Letters 和 Cluster Events。
您还可以将事件流用于您自己的发布/订阅要求。例如,事件流在测试期间可能很有用。将Test Kit 的testActor 订阅到某些事件的事件流(例如记录事件),您可以expect 他们。当发生某些事情时您不会向另一个参与者发送消息但您仍然需要在测试中期待该事件时,这可能特别有用。
请注意,事件流仅适用于一个ActorSystem。如果您使用在流上发布的远程处理事件,则默认情况下不要跨到远程系统(尽管您可以自己添加该支持)。
如果您不想使用 Event Stream,理论上可以创建一个单独的 EventBus。
正在为 Akka 2.2 编写更好的事件总线文档,因此请在 this ticket 完成后再次查看。
【讨论】:
不确定是否有任何好的教程,但我可以为您提供一个使用事件流可能会有所帮助的可能用户案例的快速示例。不过,在高层次上,事件流是一种很好的机制,可以满足您的应用程序可能具有的发布/订阅类型要求。假设您有一个用例,您可以在系统中更新用户的余额。余额经常被访问,因此您决定缓存它以获得更好的性能。当余额更新时,您还想检查用户的余额是否超过阈值,如果是,请通过电子邮件发送给他们。您不希望将缓存删除或余额阈值检查直接绑定到主余额更新调用中,因为它们可能很重并且会减慢用户的响应速度。您可以像这样对特定的一组需求进行建模:
//Message and event classes
case class UpdateAccountBalance(userId:Long, amount:Long)
case class BalanceUpdated(userId:Long)
//Actor that performs account updates
class AccountManager extends Actor{
val dao = new AccountManagerDao
def receive = {
case UpdateAccountBalance(userId, amount) =>
val res = for(result <- dao.updateBalance(userId, amount)) yield{
context.system.eventStream.publish(BalanceUpdated(userId))
result
}
sender ! res
}
}
//Actor that manages a cache of account balance data
class AccountCacher extends Actor{
val cache = new AccountCache
override def preStart = {
context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
}
def receive = {
case BalanceUpdated(userId) =>
cache.remove(userId)
}
}
//Actor that checks balance after an update to warn of low balance
class LowBalanceChecker extends Actor{
val dao = new LowBalanceDao
override def preStart = {
context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
}
def receive = {
case BalanceUpdated(userId) =>
for{
balance <- dao.getBalance(userId)
theshold <- dao.getBalanceThreshold(userId)
if (balance < threshold)
}{
sendBalanceEmail(userId, balance)
}
}
}
在此示例中,AccountCacher 和 LowBalanceChecker 演员都订阅了 BalanceUpdated 事件的类类型 eventStream。如果此事件是发布到流的事件,则这两个参与者实例都将接收到它。然后,在AccountManager 中,当余额更新成功时,它会为用户引发BalanceUpdated 事件。发生这种情况时,该消息同时被传送到AccountCacher 和LowBalanceChecker 的邮箱,从而导致余额从缓存中删除,并检查帐户阈值,并可能发送一封电子邮件。
现在,您可以直接将 tell (!) 调用放入 AccountManager 以直接与这两个参与者进行通信,但有人可能会争辩说这可能过于紧密地耦合了平衡更新的这两个“副作用”,并且这些类型的细节不一定属于AccountManager。如果您的情况可能会导致一些额外的事情(检查、更新等)需要纯粹作为副作用发生(不是核心业务流程本身的一部分),那么事件流可能是一个好方法解耦正在引发的事件以及谁可能需要对该事件做出反应。
【讨论】:
ActorSystem 已经为它创建了一个事件总线;无需自己创建。当ActorSystem 创建总线时,我假设根监护人负责总线。我不确定您所说的问题 3 是什么意思;你能再解释一下吗?