【问题标题】:Process messages at specific time reliably在特定时间可靠地处理消息
【发布时间】:2014-12-20 05:51:14
【问题描述】:

假设我有一个聊天应用程序。

客户端向聊天室发送消息,导致向某个 Actor 发出一些命令。现在,我想立即处理他写的内容,并在此聊天中提供给其他用户,所以我处理了这个命令。同时我想告诉自己(一个演员)我需要将此消息存储在聊天历史数据库中,但不是现在。保存到数据库应该每 2 分钟发生一次。如果发生崩溃,无论如何我应该能够持久保存到数据库。

我假设工作流程是这样的:

  1. 用户发送消息
  2. 聊天室参与者收到带有此消息的命令
  3. 我们将这条消息广播给所有人,并将这条消息添加到某种队列中以将其持久保存到聊天历史数据库中
  4. 2 分钟超时后,某些持久命令运行。它按照到达的顺序收集所有尚未持久化的传入聊天消息
  5. 对所有消息运行事务,然后将它们从队列中删除。
  6. 如果在 3 之后某处发生崩溃并且消息没有持久化,那么我应该尝试再次持久化它们。如果它们被坚持,我不应该再尝试坚持他们。

如何在 Akka 中构建这样的东西?我应该使用哪些功能/哪些模式?

【问题讨论】:

标签: akka cap-theorem crdt


【解决方案1】:

您可能需要两个参与者:一个(协调者)将向客户端发送有关聊天命令的通知。另一个(节流器) - 将每 2 分钟将数据推送到数据库。您的队列将只是限制器的内部状态:

class Coordinator extends Actor {
   def receive = {
     case command: Record => 
          broadcast(command)
          throttler ! command
   }
}


class Throttler extends Actor {

  import system.dispatcher

  val queue = mutable.List[Record] //it may be a cache instead

  def schedule = system.scheduler.scheduleOnce(2 minutes, self, Tick) // http://doc.akka.io/docs/akka/snapshot/scala/scheduler.html


  def receive = {
       case Start => schedule
       case command: Record =>
           queue ++= command
       case Tick => 
          schedule
          try {
            //---open transaction here---
            for (r <- queue) push(r)
            //---close transaction here---
            queue.clear //will not be cleared in case of exception
          } catch {...}
  }
}

您也可以使用@abatyuk 所说的FSM-based implementation

如果您需要减少邮箱的负载 - 您可以尝试一些背压/负载平衡模式,例如 Akka Work Pulling

如果您想保护节点本身(在某些服务器节点发生故障时恢复队列状态) - 您可以使用 Akka Cluster 来复制(手动)队列的状态。在这种情况下,协调者应该是集群单例,并且应该自己发送滴答声给随机参与者(您可以为此使用总线)并作为主管维护他们的成功和失败。请注意,主管状态可能会丢失,因此您还应该通过节点复制它(并每 2 分钟在它们之间合并状态,因此最好将 SortedSet 用于队列,因为合并将类似于 sets.reduce(_ ++ _))。

像 Riak 这样的存储已经提供了一种解决clusterization problem 的简单方法,因此您可以将它们用作队列(协调器和节流器都将是“无状态”单例)。如果是 Riak,您可以将其配置为可用+分区(参见 CAP 定理),因为在这里合并数据不是问题 - 您的聊天记录是 CRDT(conflict-free) 数据类型。

另一种解决方案是使用 WriteBehind 模式(配置为每 2 分钟启动一次)作为节流器的缓存。

事件溯源也可以保护你的actor的状态,但是当你需要在恢复后重做所有动作时它更有用(你不需要这个 - 它会将所有东西重新提交到数据库)。您可以使用快照(这与直接使用缓存几乎相同),但如果您关心可用性,最好将它们保存到缓存(通过实现 SnapshotStore)而不是本地 FS。请注意,您可能还必须删除以前保存的快照以减少存储大小。并且你应该同步保存每条记录以避免丢失状态。

附:不要忘记向发件人确认消息(发给您的 javascript),否则即使将缓存作为队列,您也可能会丢失最后的消息(在邮箱中)。

P.S/2 数据库对于actor的状态持久性几乎总是一个糟糕的解决方案,因为它很慢并且可能变得不可用。我也不推荐像 MongoDB 这样的强一致性 NoSQL 解决方案 - 最终一致性是您的最佳选择。

【讨论】:

  • 有没有办法利用现有的 Akka 持久性(事件等)来保护队列状态?
  • 您可以使用 Akka Persistence (saveSnapshot),但问题与 DB 相同 - 它可能非常慢。而且您必须同步保存每条记录以避免丢失状态,并且可能您必须将其发送到数据库或缓存(本地文件可能对您不可靠),因此实际上没有太大区别。请注意,它不会解决潜在的可用性问题(您仍然需要集群)。
  • 我不能使用 deleteMessages 所以它不会重做所有操作吗?那我就不需要 saveSnapshot 了。
  • 或者按原样使用事件溯源,通过自定义的SnapshotStore实现数据库保存,每隔2分钟保存一次快照。
  • 问题还是一样 - 您必须在接收时准确地保存每条消息(而不是批量),因此您不能为此使用 DB:在第一种情况下,每条消息都将由 @987654328 保存@ (并且您应该仅在它(异步)提交数据时通知发件人),因此在那之后使用批量事务是没有意义的。在第二种情况下 - 您不能每 2 分钟保存一次快照,因为如果发生故障,您将丢失未保存的数据(您的客户端不能等待 2 分钟确认,否则您根本不需要队列持久性)。跨度>
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-03-02
相关资源
最近更新 更多