【问题标题】:To be sure about concurrency, same group of works in multiple queues (FIFO)为了确保并发性,同一组在多个队列中工作(FIFO)
【发布时间】:2018-11-04 23:46:46
【问题描述】:

我有一个关于多消费者并发的问题。 我想将来自 Web 请求的作品发送到rabbitmq 到分布式队列。 我只想确定多个队列(FIFO)中的工作顺序。 因为这个请求来自不同的用户,所以必须订购用户请求/作品。

我发现这个特性在 Azure ServiceBus 和 ActiveMQ 消息分组上有不同的名字。

有没有办法在漂亮的 RabbitMQ 中做到这一点?

我想保证客户的请求必须相互订购。 每个客户可能有多个请求,但必须按顺序处理针对该客户的请求。 我希望通过在不同节点上使用多个消费者来快速处理传入的请求。 例如,不同的客户 1 到 1000 发送超过 100 万个请求。 如果我只将这个巨大的请求放在一个队列中,则需要花费大量时间。所以我想在 n (5) 节点之间共享这个进程负载。对于客户 X 的请求必须按照相同的顺序进行处理

【问题讨论】:

  • 你的意思是有序的,还是说每个用户的工作项必须送到同一个消费者,即使队列有多个消费者?还是两者兼而有之?
  • 所以每个客户可能有多个请求,但是必须按顺序处理该客户的这些请求,但可能会出现乱序(甚至与其他客户的请求交错?你在谈论 ActiveMQ 调用“消息分组”?(如果是这样,请编辑您的问题以明确说明:cmets 可以消失。)

标签: rabbitmq amqp fifo


【解决方案1】:

(假设 OP 正在询问诸如 ActiveMQs“消息分组”之类的事情:)

这目前没有内置到 RabbitMQ AFAIK(根据 this answer,截至 2013 年还没有),我现在不知道(尽管我最近没有跟上)。

但是,RabbitMQ 的交换和队列模型非常灵活 - 交换和队列可以很容易地动态创建(这可以在其他消息传递系统中完成,但是,例如,如果您阅读 ActiveMQ 文档或 Red Hat AMQ 文档,您将所有用户指南中的示例都在系统启动时加载的配置文件中使用预先声明的队列 - 除了类似 RPC 的请求/响应通信)。

另外,在 RabbitMQ 中,消费者(即消息消费线程)很容易从多个队列中消费。

因此,您可以在 RabbitMQ 之上构建一个系统,在其中获得所需的分组语义。

一种方法是创建动态队列:第一次看到客户订单或新的客户订单组时,将为该组的所有消息创建一个具有唯一名称的队列 - 将传达该队列名称(通过另一个队列)到一个消费者,该消费者的唯一目的是在负责处理客户订单组的其他消费者之间进行负载平衡。即,负载均衡器会从其队列中拉出一条消息,说“队列名称为 XYZ 的新组”,它会在订单组消费者池中找到一个消费者,该消费者可以承受这个负载并向它传递一条消息,说“开始监听到 XYZ”。

另一种方法是使用发布/订阅和主题路由 - 每个客户订单组将获得一个独特的主题 - 并按上述方式进行。

【讨论】:

    【解决方案2】:

    在使用基于事件的系统时,尤其是在使用多个生产者和/或消费者时,重要的是要接受这样一个事实,即通常不存在事件的保证顺序。为了获得一个健壮的系统,将系统设计为使消息处理程序是幂等的也是明智的;他们应该容忍两次(或更多)收到相同的消息。

    有很多事情可能(实际上应该被允许)干扰秩序;

    • 生产者传递消息的速度可能略有不同
    • 一个生产者可能会错过一个确认(由于错过了一个包)并将重新发送消息
    • 一个消费者可能会获取并处理一条消息,但 ack 在返回的途中丢失了,因此消息被传递了两次(给另一个消费者)。
    • 您的处理程序所依赖的其他一些服务可能已关闭,因此您必须拒绝该消息。

    话虽如此,NServicebus 之类的服务总线系统使用一种模式来强制使用订单消息。有一些要求:

    • 需要一个允许有条件更新的集中式存储(如 sql-server 或文档存储);例如,您希望能够存储最后处理的消息的序列号(或您在该过程中走了多远),但如果已经存储的序列/进度是正确/预期的一。对于大多数数据库来说,即使是为数百万客户存储用户 ID 和进度也应该是一个非常简单的操作。
    • 确保队列配置了dead-letter-queue/exchange 以进行重试,然后再次将原始队列设置为该队列的死信队列。
    • 您在重试/死信队列上设置了一个 TTL(例如 30 秒)。这样,出现在死信队列中的消息将在超时后自动推送回您的原始队列。
    • 在处理您的消息时,请检查您的存储/数据库是否处于处理消息的正确状态(即,所需的先前步骤已经完成)。
      • 如果您可以处理它,您可以更新存储(有条件!)。
      • 如果 not - 你对消息进行了确认,以便将其扔到死信队列中。基本上你是在说“不,我无法处理这条消息,队列中可能还有一些其他消息应该首先处理”。

    这种方式的快乐路径是以正确的顺序处理大量消息。 但是如果发生了一些事情并且你收到了一条带外消息,你会将它扔到重试队列(死信队列)中,Rabbit 会确保它会回到队列中以便在稍后重试.但只是在延迟之后。

    这样做的好处在于,您能够处理大多数可能会干扰消息处理的情况(消息乱序、相关服务关闭、处理程序在处理消息的过程中被关闭)完全相同的方式;通过拒绝消息并让您的基础架构(Rabbit)处理它在一段时间后重试。

    【讨论】:

      【解决方案3】:

      RabbitMQ 一致性哈希交换类型

      我们正在使用 RabbitMQ,并且我们找到了一个插件。它使用Consistent Hashing算法来分发消息,以使密钥保持一致。

      有关一致性哈希的更多信息;

      https://en.wikipedia.org/wiki/Consistent_hashing

      https://www.youtube.com/watch?v=viaNG1zyx1g

      你可以从 rabbitmq 网页找到这个插件

      插件:rabbitmq_consistent_hash_exchange

      https://www.rabbitmq.com/plugins.html

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-04-22
        • 2013-11-15
        • 2010-10-04
        • 2020-10-08
        • 1970-01-01
        相关资源
        最近更新 更多