【问题标题】:How do I achieve availability when processing events in-order?按顺序处理事件时如何实现可用性?
【发布时间】:2015-03-05 02:31:07
【问题描述】:

目标

我想实现事件的弹性处理,并且仍然按照它们到达的顺序处理它们。

说明

监听任务监听许多客户端的事件,将它们包装在消息中并将它们放入队列中。事件处理程序任务从队列中读取并处理事件。单个客户端的事件必须按顺序处理;不同客户端的事件可以乱序处理。一个简单的解决方案是:

queue[client_id % N]创建N个队列,监听队列事件。每个队列都有一个从中读取的事件处理程序。

这可行,但我还必须考虑事件处理程序失败的情况。我可以想到两种方法来处理:

  1. 在队列的读者之间进行领导者选举 - 每个队列进行一次选举
  2. 使用 RabbitMQ 的确认功能(或其他 MQ 系统中的等效功能)- 每个队列都有多个事件处理程序。事件处理程序将消息出列并在完成处理后发回确认。

问题规模

大约有 5K 事件/秒,事件很小 50-200 字节。队列中的消息可能是由 1000 个事件组成的集群,以减少 MQ 开销。这意味着支持选项 2 并每秒处理数十条消息的 MQ 系统将可以工作。

问题

我可以执行选项 2 还是队列中的争用过多?它甚至支持使用确认吗?有没有更好的方法来实现这样的设计?我可以将事件集群发送到事件处理程序,并在发送下一个块之前等待 ACK,但我认为使用 MQ 可能不太需要重新发明轮子。

【问题讨论】:

  • 我认为您需要一个特定于任务的单一队列。不知道你们的MQ系统是否支持这种东西。每个客户端使用一个队列的另一种选择,这取决于您的 MQ 系统、队列的成本,因此它可能也可以工作。如果我是你,我会写一个自动测试来测量。
  • 你检查过 ZeroMQ 和 nanomsg 吗?它们也是很好的 MQ 系统,但工作方式与 RabbitMQ 不同。
  • @inf3rno,我没有使用 MQ 系统,但我正在考虑使用它。我会选择一个具有我需要的功能的系统。

标签: parallel-processing message-queue mq


【解决方案1】:

另一种方法是在 Kafka 中使用partitioned topic。 Kafka 可以通过标识符对主题进行分区,并在分区内保持顺序。所以在这种情况下,最好按client_id 进行分区。然后,Kafka 通过重新划分主题来管理客户端之间的负载。

另见:How to achieve distributed processing and high availability simultaneously in Kafka?

【讨论】:

    【解决方案2】:

    我以前使用过 RabbitMQ,但无法确认它是否支持您在成功处理第一条消息时才发送下一条消息的方案,我将把这部分留给对该工具有更好了解的其他人。

    对于任何通用服务/消息总线的实现:

    每个事件源的队列可以通过以下技术帮助解决问题:

    • 拥有一个处理程序池(每个队列,或者如果事件相同,通常可以从所有队列处理),当一个处理程序失败时,我们仍然有许多其他处理程序,我们可以实例化尽可能多或尽可能少的处理程序来处理跟上需求。
    • 在处理程序处理事件时,应锁定每个队列,以保持处理的事件顺序。
    • 锁定的队列应该有一个超时时间来自动解锁并恢复最后一个事件(被锁定),以防处理程序选择一个事件并在处理它时失败。
    • 处理事件的处理程序在完成后解锁队列并删除它刚刚完成处理的事件。

    该技术部分是 peek-lock,但会锁定整个队列,而不仅仅是 1 个事件。

    使用这种方法您不会获得更多的吞吐量,可能会获得更多的稳定性和故障恢复能力,这主要是因为队列的锁定。更好的方法是考虑如何消除对消息顺序的依赖,并允许处理程序以任何顺序选择这些消息,同时仍然实现所需的业务逻辑(也许correlation 可能会有所帮助)。

    【讨论】:

      【解决方案3】:

      首先,我坚信顺序处理请求的最有效方式是由单个处理程序处理它们。因此,解决方案应该提供“亲和力”,因此每个客户端都应该绑定到处理程序(如您在“简单”解决方案中所述)。我们只需要创建一个高效且与处理程序崩溃兼容的调度例程。

      如何实现亲和力?如果必须关闭处理程序,如何重新分配关联?

      作为一项关键措施,我建议限制处理程序队列的长度,并在一个大的“传入”队列中保持异常负载:

      clients -> listener -> (big incoming queue) -> load balancer -> handler queues -> handlers 
                                                          \________<-__system events_<-____/
      

      一旦我们对处理程序队列的长度有一些保证(例如:“队列足够短,可以在 0.1 秒内处理”),我们就可以访问各种“负载平衡”方法,在一般情况下可能不可用。在不损失性能的情况下,应将队列大小的限制调整得尽可能低。

      “负载平衡器”例程应该能够:

      1. 维护将处理程序分配给客户端的双向映射(或普通数组);
      2. 根据映射将消息从传入队列分发到处理程序队列;
      3. 在处理程序空闲时接收通知(然后解除客户端与该处理程序的绑定)
      4. 接收有关处理程序死亡的通知(然后从此处理程序重新分配客户端);
      5. 回收死处理程序的队列(如果不允许数据丢失)

      在这样的架构中,逻辑将是高度可定制的。例如,我们将有多种选择如何实现“队列溢出”(处理程序队列已达到其限制的情况):

      • 停止整个处理
      • 杀死处理程序并重新分配其队列
      • 了解(以某种方式,例如通过时间限制或通过共享内存中的计数器)处理程序队列不包含来自此客户端的任何请求,然后重新分配客户端;
      • 以某种方式推迟请求;
      • 禁止客户端;等

      我的提议不是高科技解决方案,它更像是自己动手编程。但我想这样的设计将具有足够的可扩展性和效率,足以实现高可用性。

      【讨论】:

        猜你喜欢
        • 2023-03-14
        • 1970-01-01
        • 1970-01-01
        • 2019-03-15
        • 2018-11-05
        • 1970-01-01
        • 1970-01-01
        • 2010-12-16
        • 1970-01-01
        相关资源
        最近更新 更多