【问题标题】:How to consume just one message from rabbit mq on nodejs如何在nodejs中仅使用来自rabbitmq的一条消息
【发布时间】:2015-02-26 16:06:46
【问题描述】:

我正在使用 amqp.node 库将 rabbitmq 集成到我的系统中。

但在消费者中,我想一次只处理一条消息,然后确认该消息,然后使用队列中的下一条消息。

目前的代码是:

// Consumer
open.then(function(conn) {
  var ok = conn.createChannel();
  ok = ok.then(function(ch) {
    ch.assertQueue(q);
    ch.consume(q, function(msg) {
      if (msg !== null) {
        othermodule.processMessage(msg, function(error, response){
          console.log(msg.content.toString());
          ch.ack(msg);
        });
      }
    });
  });
  return ok;
}).then(null, console.warn);

ch.consume 将一次性处理通道中的所有消息,并且此处调用它的模块的功能将不会在同一时间线执行。

我想在消费队列中的下一条消息之前等待 othermodule 函数完成。

【问题讨论】:

    标签: node.js rabbitmq


    【解决方案1】:

    目前(2018 年),我认为 RabbitMQ 团队可以选择这样做:

    https://www.rabbitmq.com/tutorials/tutorial-two-javascript.html

    ch.prefetch(1);
    

    为了解决这个问题,我们可以使用带有值的预取方法 1. 这告诉 RabbitMQ 不要向一个 工人一次。或者,换句话说,不要向 一个工人,直到它处理并确认了前一个工人。 相反,它会将其分派给下一个尚未停止的工作人员 忙。

    【讨论】:

      【解决方案2】:

      【讨论】:

        【解决方案3】:

        在此处跟进示例:

        https://www.npmjs.com/package/amqplib

        // Consumer
        function consumer(conn) {
          var ok = conn.createChannel(on_open);
          function on_open(err, ch) {
            if (err != null) bail(err);
            ch.assertQueue(q);
            
             // IMPORTANT
            ch.prefetch(1);
        
            ch.consume(q, function(msg) {
              if (msg !== null) {
                console.log(msg.content.toString());
                ch.ack(msg);
              }
            });
          }
        }
        

        参考:http://www.squaremobius.net/amqp.node/channel_api.html#channel_prefetch

        【讨论】:

          【解决方案4】:

          创建模型时,您需要在其上设置 QOS。下面是我们在 C# 中的做法:

              var _model = rabbitConnection.CreateModel();
              // Configure the Quality of service for the model. Below is how what each setting means.
              // BasicQos(0="Dont send me a new message untill I’ve finshed",  _fetchSize = "Send me N messages at a time", false ="Apply to this Model only")
              _model.BasicQos(0, _fetchSize, false);
              var consumerTag = _model.BasicConsume(rabbitQueue.QueueName, false, _consumerName, queueingConsumer);
          

          【讨论】:

          • 这甚至不在 nodejs 中。这是如何被接受的答案?
          【解决方案5】:

          您必须设置 QoS = 1。

          ch = ...
          ch.qos(1);
          ch.consume(q, msg => { ... });
          

          (javascript)

          【讨论】:

            猜你喜欢
            • 2020-05-15
            • 1970-01-01
            • 2023-03-14
            • 1970-01-01
            • 1970-01-01
            • 2020-02-18
            • 2021-07-28
            • 2012-10-08
            • 2013-07-13
            相关资源
            最近更新 更多