【问题标题】:"Resequencing" messages after processing them out-of-order在乱序处理消息后“重新排序”消息
【发布时间】:2017-01-05 20:32:06
【问题描述】:

我正在研究一个基本上是高度可用的分布式消息传递系统。系统通过 HTTP 或 TCP 从某个地方接收消息,对其执行各种转换,然后将其发送到一个或多个目的地(也使用 TCP/HTTP)。

系统要求发送到给定目的地的所有消息都是有序的,因为某些消息建立在先前消息的内容之上。这限制了我们按顺序处理消息,每条消息大约需要 750 毫秒。因此,例如,如果有人每 250 毫秒向我们发送一条消息,我们就不得不将这些消息排在后面。这最终会在高负载下的消息处理中引入无法容忍的延迟,因为每条消息可能必须等待数百条其他消息被处理才能轮到它。

为了解决这个问题,我希望能够在不破坏我们按顺序发送它们的要求的情况下并行化我们的消息处理。

我们可以轻松地横向扩展我们的处理。丢失的部分是一种确保即使消息被无序处理,它们也会被“重新排序”并按照接收顺序发送到目的地的方法。我正在努力寻找实现这一目标的最佳方法。

Apache Camel 有 a thing called a Resequencer 可以做到这一点,它包含一个漂亮的图表(我没有足够的代表直接嵌入)。这正是我想要的:接收乱序消息并将它们按顺序排列的东西。

但是,我不希望它是用 Java 编写的,我需要高可用性的解决方案(即抵抗典型的系统故障,如崩溃或系统重启),我认为 Apache Camel 不提供。

我们的应用程序是用 Node.js 编写的,使用 Redis 和 Postgresql 来实现数据持久性。我们将Kue 库用于我们的消息队列。尽管 Kue 提供了优先级队列,但功能集对于上述用例来说太有限了,所以我认为我们需要一种替代技术来与 Kue 协同工作来重新排序我们的消息。

我试图在网上研究这个主题,但我找不到我预期的那么多信息。这似乎是那种会有大量文章和实现的分布式架构模式,但我看不到那么多。搜索诸如“消息重新排序”、“乱序处理”、“并行消息处理”等内容的解决方案大多只是放松基于分区或主题等的“有序”要求。或者,他们谈论单台机器上的并行化。我需要一个解决方案:

  • 可以以任意顺序同时处理多条消息。
  • 将始终按照它们到达系统的顺序发送消息,无论它们是按什么顺序处理的。
  • 可从 Node.js 使用
  • 可以在 HA 环境中运行(即它的多个实例同时在同一个消息队列上运行而不会出现不一致。)

我们目前的计划是使用 Redis 来维护按到达时间排序的正在进行和准备发送的消息集,这对我来说很有意义,但我无法在网上找到描述。大致是这样工作的:

  1. 收到消息后,该消息将被放入进行中集。
  2. 消息处理完成后,该消息将被放入准备发送集。
  3. 只要在进行中和准备发送集的前面有相同的消息,就可以发送该消息并且它会按顺序排列。

我会编写一个小型 Node 库,通过使用原子 Redis 事务的优先级队列式 API 来实现此行为。但这只是我自己想出来的,所以我想知道:是否还有其他技术(最好使用我们已经使用的 Node/Redis 堆栈)来解决重新排序无序消息的问题?或者我可以将这个问题用作研究关键字的其他术语吗?感谢您的帮助!

【问题讨论】:

  • 重新排序随机分布在集群中的消息将是一项挑战。以这种方式排序消息的问题是异步网络模型假定消息无限延迟,因此它可能需要无限的资源来重新排序消息队列。您应该寻找一种对消息进行分区的方法,以便必须相互排序的消息进入一台机器。
  • 感谢您的评论。我们已经在对消息进行分区,但我们希望根据我们的合同在不可能进一步分区的单个大容量分区中提高我们的性能。尽管处理任意大的进行中队列可能不切实际,但实际上这应该受到我们集群的大小的限制,集群的大小不会超过 10-20 台机器(到目前为止)。我们有一个单独的队列系统来存储等待处理器接收的消息。

标签: node.js redis message-queue distributed kue


【解决方案1】:

这是一个常见问题,所以肯定有很多解决方案。这也是一个比较简单的问题,也是分布式系统领域一个很好的学习机会。我建议你自己写。

你会遇到一些问题,即

2:一次性交付
1:保证消息顺序
2:一次性发货

您找到了第 1 号,您正在通过在 redis 中重新排序它们来解决这个问题,这是一个不错的解决方案。但是,另一个没有解决。

看起来您的架构不适合容错,因此目前,如果服务器崩溃,您可以重新启动它并继续您的生活。这在按顺序处理所有请求时效果很好,因为这样您就可以根据上次成功完成的请求确切地知道崩溃的时间。

您需要的要么是找出您实际完成了哪些请求以及哪些请求失败了的策略,要么是在出现问题时向您的客户发送一封写得很好的道歉信。

如果 Redis 没有分片,那么它是强一致的。如果该单个节点崩溃,它将失败并可能丢失所有数据,但您不会遇到乱序数据或数据突然出现和退出存在的任何问题。因此,单个 Redis 节点可以保证,如果将消息插入到 to-process-set,然后再插入到 done-set,则没有节点将在 done-set 中看到该消息,而该消息也位于 to-流程集。

我会怎么做

使用 redis 似乎太模糊了,假设消息不是很大,如果进程崩溃,丢失它们是可以的,并且多次运行它们,甚至同时运行单个请求的多个副本不是问题。

我建议设置一个主管服务器,它接收传入的请求,将每个请求分派到随机选择的从属服务器,存储响应并在发送之前将它们重新排序。你说你预计处理需要 750 毫秒。如果从服务器在 2 秒内没有响应,则在 0-1 秒内将其再次随机分派到另一个节点。第一个响应的是我们要使用的那个。谨防重复响应。

如果重试请求也失败,则将最大等待时间加倍。在大约 5 次失败之后,每次等待的时间是前一次的两倍(或任何大于一的倍数),我们可能会遇到永久性错误,因此我们可能应该要求人工干预。该算法称为指数退避,可防止请求突然激增而导致整个集群瘫痪。不使用随机间隔,并且在 n 秒后重试可能会导致每 n 秒发生一次 DOS 攻击,直到集群死亡,如果它得到足够大的负载峰值。

失败的方式有很多,因此请确保该系统不是唯一存储数据的地方。但是,这可能会在 99+% 的时间里工作,它可能至少与您当前的系统一样好,并且您可以用几百行代码来实现它。只需确保您的主管使用异步请求,以便您可以处理重试和超时。 Javascript 本质上是单线程的,所以这比平常稍微复杂一些,但我相信你可以做到。

【讨论】:

  • 感谢您的深思熟虑的回答。我喜欢主管服务器的想法,但我担心它会成为扩展瓶颈。根据您的建议,我将尝试更多地调查此选项。如果“主管”可以是 HA 或具有自动故障转移,那可能是最好的选择。另外,我忘记在问题中提及这一点,但消息也会持久保存到 Postgresql 以进行长期存储。
  • 以后你可以使用 raft 或类似的实现来为你的主管获得一些容错能力,但这更难做到。
猜你喜欢
  • 1970-01-01
  • 2020-02-17
  • 1970-01-01
  • 1970-01-01
  • 2018-04-30
  • 2019-12-31
  • 2021-12-05
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多