【问题标题】:Dead Lettered Message Not Being Consumed in RabbitMQ and Node Using AMQP.Node在 RabbitMQ 和使用 AMQP.Node 的节点中未使用死信消息
【发布时间】:2025-12-05 13:10:02
【问题描述】:

我想在我的一名工人经过一定时间后收到一条消息。在发现所谓的死信交换后,我决定使用 Node 和 RabbitMQ。

消息似乎已发送到 DeadExchange 中的队列,但在 WorkExchange 中的 WorkQueue 中经过一段时间后,消费者从未收到消息。要么 bindQueue 关闭,要么死信不起作用?

我现在尝试了很多不同的值。有人可以指出我缺少什么吗?

var amqp = require('amqplib');
var url = 'amqp://dev.rabbitmq.com';

amqp.connect(url).then(function(conn) {
    //Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to.
    return conn.createChannel().then(function(ch) {
        return ch.assertExchange('WorkExchange', 'direct').then(function() {
            return ch.assertQueue('WorkQueue', {
                autoDelete: false,
                durable: true
            })
        }).then(function() {
            return ch.bindQueue('WorkQueue', 'WorkExchange', '');
        }).then(function() {
            console.log('Waiting for consume.');

            return ch.consume('WorkQueue', function(msg) {
                console.log('Received message.');
                console.log(msg.content.toString());
                ch.ack(msg);
            });
        });
    })
}).then(function() {
    //Now send a test message to DeadExchange to a random (unique) queue.
    return amqp.connect(url).then(function(conn) {
        return conn.createChannel();
    }).then(function(ch) {
        return ch.assertExchange('DeadExchange', 'direct').then(function() {
            return ch.assertQueue('', {
                arguments: {
                    'x-dead-letter-exchange': 'WorkExchange',
                    'x-message-ttl': 2000,
                    'x-expires': 10000
                }
            })
        }).then(function(ok) {
            console.log('Sending delayed message');

            return ch.sendToQueue(ok.queue, new Buffer(':)'));
        });
    })
}).then(null, function(error) {
    console.log('error\'ed')
    console.log(error);
    console.log(error.stack);
});

我正在使用 amqp.node (https://github.com/squaremo/amqp.node),它是 npm 中的 amqplib。虽然 node-amqp (https://github.com/postwait/node-amqp) 似乎更受欢迎,但它并没有实现完整的协议,并且在重新连接方面存在相当多的突出问题。

dev.rabbitmq.com 正在运行 RabbitMQ 3.1.3。

【问题讨论】:

  • 你能提供更多关于你想要达到的目标的细节吗?目前还不完全清楚你想用死信交换来做什么......
  • 目前我的工作人员每 2 秒轮询一次数据库以检查事件是否结束(然后做一些事情,比如更新几个集合并通知另一个队列通过 SSE 向客户端发送消息)开始了。现在,我想发送 1 条死信消息,最终在工作队列中(经过特定时间后)我的工作人员收听。这将消除数据库的负载并让我更容易扩展。常规的 RabbitMQ 消息可以正常工作,但死信交换却不行。有什么想法吗?
  • 备案:工作队列中的消费者永远不会收到消息。

标签: node.js rabbitmq dead-letter


【解决方案1】:

这是一个工作代码。当消息在 DeadExchange 中花费超过 ttl 时,它会被推送到 WorkExchange。成功的关键是定义正确的路由键。您希望发送 post ttl 的交换队列应与路由键绑定(注意:不是默认值),并且“x-dead-letter-routing-key”属性值应与该路由键匹配。

var amqp = require('amqplib');
var url = 'amqp://localhost';

amqp.connect(url).then(function(conn) {
    //Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to.
    return conn.createChannel().then(function(ch) {
        return ch.assertExchange('WorkExchange', 'direct').then(function() {
            return ch.assertQueue('WorkQueue', {
                autoDelete: false,
                durable: true
            })
        }).then(function() {
            return ch.bindQueue('WorkQueue', 'WorkExchange', 'rk1');
        }).then(function() {
            console.log('Waiting for consume.');

            return ch.consume('WorkQueue', function(msg) {
                console.log('Received message.');
                console.log(msg.content.toString());
                ch.ack(msg);
            });
        });
    })
}).then(function() {
    //Now send a test message to DeadExchange to DEQ queue.
    return amqp.connect(url).then(function(conn) {
        return conn.createChannel();
    }).then(function(ch) {
        return ch.assertExchange('DeadExchange', 'direct').then(function() {
            return ch.assertQueue('DEQ', {
                arguments: {
                    'x-dead-letter-exchange': 'WorkExchange',
                    'x-dead-letter-routing-key': 'rk1',
                    'x-message-ttl': 15000,
                    'x-expires': 100000
                }
            })
        }).then(function() {
            return ch.bindQueue('DEQ', 'DeadExchange', '');
        }).then(function() {
            console.log('Sending delayed message');

            return ch.publish('DeadExchange', '', new Buffer("Over the Hills and Far Away!"));
        });
    })
}).then(null, function(error) {
    console.log('error\'ed')
    console.log(error);
    console.log(error.stack);
});

【讨论】:

    【解决方案2】:

    这是一个使用 AMQP Connection Manager for Node 的示例。我注意到似乎没有任何示例与我们在代码中所做的相匹配,因此我创建了一个包含一个简单示例的 repo,并通过重新发布回主交易所的方式进行了重试计数:https://github.com/PritchardAlexander/node-amqp-dead-letter-queue

    这是一个简单的例子:

    const amqp = require('amqp-connection-manager');
    const username = encodeURIComponent('queue');
    const password = encodeURIComponent('pass');
    const port = '5672';
    const host = 'localhost';
    const connectionString = `amqp://${username}:${password}@${host}:${port}`;
    
    // Ask the connection manager for a ChannelWrapper.  Specify a setup function to
    // run every time we reconnect to the broker.
    connection = amqp.connect([connectionString]);
    
    // A channel is your ongoing connection to RabbitMQ.
    // All commands go through your channel.
    connection.createChannel({
      json: true,
      setup: function (channel) {
        channel.prefetch(100);
    
        // Setup EXCHANGES - which are hubs you PUBLISH to that dispatch MESSAGES to QUEUES
        return Promise.all([
          channel.assertExchange('Test_MainExchange', 'topic', {
            durable: false,
            autoDelete: true,
            noAck: false
          }),
          channel.assertExchange('Test_DeadLetterExchange', 'topic', {
            durable: false,
            autoDelete: true,
            maxLength: 1000,
            noAck: true // This means dead letter messages will not need an explicit acknowledgement or rejection
          })
        ])
        // Setup QUEUES - which are delegated MESSAGES by EXCHANGES.
        // The MESSAGES then need to be CONSUMED.
        .then(() => {
          return Promise.all([
            channel.assertQueue(
              'Test_MainQueue',
              options = {
                durable: true,
                autoDelete: true,
                exclusive: false,
                messageTtl: 1000*60*60*1,
                deadLetterExchange: 'Test_DeadLetterExchange'
              }
            ),
            channel.assertQueue('Test_DeadLetterQueue',
              options = {
                durable: false,
                autoDelete: true,
                exclusive: false
              }
            )
          ]);
        })
        // This glues the QUEUES and EXCHANGES together
        // The last parameter is a routing key. A hash/pound just means: give me all messages in the exchange.
        .then(() => {
          return Promise.all([
            channel.bindQueue('Test_MainQueue', 'Test_MainExchange', '#'),
            channel.bindQueue('Test_DeadLetterQueue', 'Test_DeadLetterExchange', '#')
          ]);
        })
        // Setup our CONSUMERS
        // They pick MESSAGES off of QUEUES and do something with them (either ack or nack them)
        .then(() => {
          return Promise.all([
            channel.consume('Test_MainQueue', (msg) => {
              const stringifiedContent = msg.content ? msg.content.toString() : '{}';
              console.log('Test_MainQueue::CONSUME ' + stringifiedContent);
    
              const messageData = JSON.parse(stringifiedContent);
              if (messageData.value === 0) {
                console.log('Test_MainQueue::REJECT ' + stringifiedContent);
                // the 'false' param at the very end means, don't retry! dead letter this instead!
                return channel.nack(msg, true, false);
              }
              return channel.ack(msg);
            })
          ]),
          channel.consume('Test_DeadLetterQueue', (msg) => {
            const stringifiedContent = msg.content ? msg.content.toString() : '{}';
            console.log('');
            console.log('Test_DeadLetterQueue::CONSUME ' + stringifiedContent);
            console.log('');
          });
        })
        .then(() => {
          setInterval(function () {
            const messageData = {
              text: 'Dead letter if 0',
              value: Math.floor(Math.random()*5)
            };
            const stringifiedMessage = JSON.stringify(messageData);
    
            // Publish message to exchange
            if (channel.publish('Test_MainExchange', '', new Buffer(stringifiedMessage))) {
              console.log(`Sent ${stringifiedMessage}`);
            } else {
              console.log(`Failed to send ${stringifiedMessage}`);
            };
          }, 300);
        });
      }
    });
    

    【讨论】:

      【解决方案3】:

      AMQP.Node 中的 Channel#assertQueue 中有一个错误,该错误刚刚得到修复,请参阅 https://github.com/squaremo/amqp.node/commit/3749c66b448875d2df374e6a89946c0bdd0cb918。该修复程序已在 GitHub 上,但尚未在 npm 中。

      【讨论】: