【问题标题】:RabbitMQ / AMQP: single queue, multiple consumers for same message?RabbitMQ / AMQP:单个队列,同一消息的多个消费者?
【发布时间】:2012-05-24 03:49:45
【问题描述】:

我刚开始使用 RabbitMQ 和 AMQP。

  • 我有一个消息队列
  • 我有多个消费者,我想用相同的消息做不同的事情。

大多数 RabbitMQ 文档似乎都集中在循环,即单个消费者使用单个消息,负载在每个消费者之间分散。这确实是我目睹的行为。

一个例子:生产者有一个队列,每 2 秒发送一次消息:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

这是一个消费者:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

如果我启动消费者两次,我可以看到每个消费者都在循环使用交替消息。例如,我将在一个终端中看到消息 1、3、5,在另一个终端中看到消息 2、4、6

我的问题是:

  • 我可以让每个消费者收到相同的消息吗?即,两个消费者都收到消息 1、2、3、4、5、6?这在 AMQP/RabbitMQ 中叫什么?正常是怎么配置的?

  • 这通常会做吗?我是否应该让交换将消息路由到两个单独的队列中,而只有一个消费者?

【问题讨论】:

  • 我不是 RabbitMQ 专家。但是,您现在拥有的称为队列,但您想要的是主题,请参阅本教程:rabbitmq.com/tutorials/tutorial-five-python.html,有关队列与主题的更多信息:msdn.microsoft.com/en-us/library/windowsazure/hh367516.aspx
  • 我相信他实际上想要扇出,尽管主题也可以工作,并且稍后会给予更多控制。
  • 感谢@UrbanEsc。主题似乎通过让一条消息到达多个队列来解决问题,因此被每个队列消费者消费。对于我的特殊情况,这让我更倾向于多队列/单一消费者场景。
  • 对于 2018 年(甚至 2016 年及更早),答案是使用 Kafka、IMO 之类的东西。

标签: node.js messaging rabbitmq amqp node-amqp


【解决方案1】:

我可以让每个消费者收到相同的消息吗?即,两个消费者都收到消息 1、2、3、4、5、6?这在 AMQP/RabbitMQ 中叫什么?正常是怎么配置的?

不,如果消费者在同一个队列中,则不会。来自 RabbitMQ 的 AMQP Concepts 指南:

重要的是要了解,在 AMQP 0-9-1 中,消息在消费者之间是负载平衡的。

这似乎暗示 队列中的循环行为是给定的,并且不可配置。即,需要单独的队列才能让多个消费者处理相同的消息 ID。

这通常会做吗?我是否应该让 Exchange 将消息路由到两个单独的队列中,只有一个消费者?

不,不是,单个队列/多个消费者,每个消费者处理相同的消息 ID 是不可能的。让交换将消息路由到两个单独的队列中确实更好。

因为我不需要太复杂的路由,所以 扇出交换 可以很好地处理这个问题。我之前并没有过多关注 Exchange,因为 node-amqp 具有“默认交换”的概念,允许您直接将消息发布到连接,但是大多数 AMQP 消息都发布到特定的交换。

这是我的扇出交换,发送和接收:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})

【讨论】:

  • 扇出显然是您想要的。它在这里对你没有帮助,但我想我会提到队列中的循环行为是可配置的。 int prefetchCount = 1; channel.basicQos(prefetchCount); 这将允许每个消费者在完成前一个消息后立即收到一条消息。而不是接收交替消息。同样不能解决您的问题,但可能对人们了解有用。此处示例 http://www.rabbitmq.com/tutorials/tutorial-two-java.html 在 Fair Dispatch 下
  • 澄清一下:“默认交换”不是特定于 node-amqp 的。它是具有以下规则的一般 AMQP 概念:当任何消息发布到默认交换时,AMQP 代理将路由键(该消息发布的)视为队列名称。因此,您似乎可以直接发布到队列。但你不是。代理只需将每个队列绑定到默认交换,路由键等于队列名称。
  • 在rabbitmq中是否有任何替代Apache activemq jms主题的方法,其中不涉及队列,而是多播?
  • 如果同一个用户从多个设备登录,那么消息只会得到一个设备。请问如何解决它或任何想法?
  • 当然可以,但现在我正在寻找一种方法来在多个客户端之间保持 1 个长期运行的连接
【解决方案2】:

最后几个答案几乎是正确的 - 我有大量的应用程序会生成消息,这些消息需要最终发送给不同的消费者,所以这个过程非常简单。

如果您希望多个消费者接收同一条消息,请执行以下过程。

创建多个队列,一个用于接收消息的每个应用程序,在每个队列属性中,将路由标记与 amq.direct 交换“绑定”。更改您的发布应用程序以发送到 amq.direct 并使用路由标签(不是队列)。 AMQP 然后将消息复制到具有相同绑定的每个队列中。像魅力一样工作:)

示例:假设我生成了一个 JSON 字符串,我使用路由标记“new-sales-order”将其发布到“amq.direct”交换,我有一个用于打印订单的 order_printer 应用程序的队列,我的计费系统有一个队列,它将向客户发送订单副本并向客户开具发票,我有一个网络存档系统,我出于历史/合规性原因存档订单,我有一个客户网络界面,其中订单作为其他信息进行跟踪来了一个订单。

所以我的队列是:order_printer、order_billing、order_archive 和 order_tracking 都有绑定标签“new-sales-order”绑定,所有4个都会得到JSON数据。

这是在发布应用不知道或不关心接收应用的情况下发送数据的理想方式。

【讨论】:

  • 这应该是公认的答案。
  • 就是这样:)
【解决方案3】:

只需阅读rabbitmq tutorial。您发布消息以进行交换,而不是排队;然后将其路由到适当的队列。在您的情况下,您应该为每个消费者绑定单独的队列。这样,他们就可以完全独立地使用消息。

【讨论】:

    【解决方案4】:

    是的,每个消费者都可以收到相同的消息。看一下 http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq.com/tutorials/tutorial-five-python.html

    用于路由消息的不同方式。我知道它们适用于 python 和 java,但是理解原理,决定你在做什么,然后找到如何在 JS 中做这件事很好。听起来您想做一个简单的扇出 (tutorial 3),它将消息发送到连接到交换的所有队列。

    你在做什么和你想做的区别基本上是你要设置和交换或键入扇出。扇出交换将所有消息发送到所有连接的队列。每个队列都有一个消费者,可以分别访问所有消息。

    是的,这很常见,这是 AMPQ 的功能之一。

    【讨论】:

    • 很好的答案,除了“这通常做吗?”我指的是“让每个消费者收到相同的消息”——这并不常见(同一队列上的消费者总是轮询)。可能是我不够清楚的错。
    • 其实我敢说这取决于你想用它做什么。您有两个基本选择 pub/sub 或工作队列。您最初的设置是一个工作队列,但您想要的是一个扇出 pub/sub。他们指出,这里的常见用法完全取决于您想要做什么。
    • 当然,但是在工作队列中,相同的消息(例如,相同的消息 ID)不会由不同的消费者处理——它隐含地循环。同样,这可能是我不够清楚的错。
    • 我们似乎在这里讨论了不同的目的。
    • 很抱歉给您带来了困惑。如果有某种方式可以让同一队列上的消费者处理相同的消息 ID 的工作队列,请指点我参考。否则我会继续相信我在别处读到的内容。
    【解决方案5】:

    发送模式是一对一的关系。如果你想“发送”给多个接收者,你应该使用 pub/sub 模式。详情请见http://www.rabbitmq.com/tutorials/tutorial-three-python.html

    【讨论】:

      【解决方案6】:

      RabbitMQ / AMQP:单个队列,多个消费者用于同一消息和页面刷新。

      rabbit.on('ready', function () {    });
          sockjs_chat.on('connection', function (conn) {
      
              conn.on('data', function (message) {
                  try {
                      var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));
      
                      if (obj.header == "register") {
      
                          // Connect to RabbitMQ
                          try {
                              conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                                  autoDelete: false,
                                  durable: false,
                                  exclusive: false,
                                  confirm: true
                              });
      
                              conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                                  durable: false,
                                  autoDelete: false,
                                  exclusive: false
                              }, function () {
                                  conn.channel = 'my-queue-'+obj.agentID;
                                  conn.q.bind(conn.exchange, conn.channel);
      
                                  conn.q.subscribe(function (message) {
                                      console.log("[MSG] ---> " + JSON.stringify(message));
                                      conn.write(JSON.stringify(message) + "\n");
                                  }).addCallback(function(ok) {
                                      ctag[conn.channel] = ok.consumerTag; });
                              });
                          } catch (err) {
                              console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                          }
      
                      } else if (obj.header == "typing") {
      
                          var reply = {
                              type: 'chatMsg',
                              msg: utils.escp(obj.msga),
                              visitorNick: obj.channel,
                              customField1: '',
                              time: utils.getDateTime(),
                              channel: obj.channel
                          };
      
                          conn.exchange.publish('my-queue-'+obj.agentID, reply);
                      }
      
                  } catch (err) {
                      console.log("ERROR ----> " + err.stack);
                  }
              });
      
              // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
              conn.on('close', function () {
                  try {
      
                      // Close the socket
                      conn.close();
      
                      // Close RabbitMQ           
                     conn.q.unsubscribe(ctag[conn.channel]);
      
                  } catch (er) {
                      console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
                  }
              });
          });
      

      【讨论】:

        【解决方案7】:

        我评估你的情况是:

        • 我有一个消息队列(您接收消息的来源,我们将其命名为 q111)

        • 我有多个消费者,我想用相同的消息做不同的事情。

        您的问题是,当此队列接收到 3 条消息时,消息 1 被消费者 A 消费,其他消费者 B 和 C 消费消息 2 和 3。当您需要设置 rabbitmq 传递所有这三个消息(1,2,3)的相同副本同时发送给所有三个连接的消费者(A,B,C)。

        虽然可以进行许多配置来实现这一点,但一种简单的方法是使用以下两步概念:

        • 使用动态 rabbitmq-shovel 从所需队列 (q111) 中提取消息并发布到扇出交换器(专门为此目的创建和专用的交换器)。
        • 现在重新配置您的消费者 A、B 和 C(他们正在侦听队列 (q111)),以使用每个消费者的专有匿名队列直接从该扇出交换中侦听。

        注意:使用此概念时不要直接从源队列 (q111) 消费,因为已经消费的消息不会被铲到您的 Fanout 交换中。

        如果您认为这不能满足您的确切要求...请随时发表您的建议 :-)

        【讨论】:

          【解决方案8】:

          我认为您应该检查使用 fan-out 交换器发送消息。这样,您将为不同的消费者接收相同的消息,在表下 RabbitMQ 正在为每个新的消费者/订阅者创建不同的队列。

          这是在 javascript 中查看教程示例的链接 https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html

          【讨论】:

            【解决方案9】:

            要获得您想要的行为,只需让每个消费者从自己的队列中消费即可。您必须使用非直接交换类型(主题、标头、扇出)才能一次将消息发送到所有队列。

            【讨论】:

              【解决方案10】:

              如果您碰巧像我一样使用amqplib 库,他们有一个handy examplePublish/Subscribe RabbitMQ tutorial 实现@ 可能会很方便。

              【讨论】:

                【解决方案11】:

                在这个场景中有一个有趣的选项,我在这里的答案中没有找到。

                您可以在一个消费者中使用“重新排队”功能对消息进行 Nack,以便在另一个消费者中处理它们。 一般来说,这不是正确的方法,但也许对某人来说已经足够了。

                https://www.rabbitmq.com/nack.html

                当心循环(当所有消费者都 nack+requeue 消息时)!

                【讨论】:

                • 我强烈建议不要这样做,因为它不会以任何方式扩展。消费者没有订单,你不能保证消费者 B 不会重新排队,在消费者 A 处理和重新排队之前收到消息,上面提到的循环是一个问题。正如您所说的“这通常不是正确的方法”,我想不出比其他答案更好的场景。
                【解决方案12】:

                扇出显然是您想要的。 fanout

                阅读rabbitMQ教程: https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html

                这是我的例子:

                Publisher.js:

                amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
                    if (error0) {
                      throw error0;
                    }
                    console.log('RabbitMQ connected')
                    try {
                      // Create exchange for queues
                      channel = await connection.createChannel()
                      await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
                      await channel.publish(process.env.EXCHANGE_NAME, '', Buffer.from('msg'))
                    } catch(error) {
                      console.error(error)
                    }
                })
                

                订阅者.js:

                amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
                    if (error0) {
                      throw error0;
                    }
                    console.log('RabbitMQ connected')
                    try {
                      // Create/Bind a consumer queue for an exchange broker
                      channel = await connection.createChannel()
                      await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
                      const queue = await channel.assertQueue('', {exclusive: true})
                      channel.bindQueue(queue.queue, process.env.EXCHANGE_NAME, '')
                
                      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C");
                      channel.consume('', consumeMessage, {noAck: true});
                    } catch(error) {
                      console.error(error)
                    }
                });
                

                这是我在互联网上找到的一个例子。也许也可以提供帮助。 https://www.codota.com/code/javascript/functions/amqplib/Channel/assertExchange

                【讨论】:

                  最近更新 更多