【问题标题】:Optimizing RabbitMQ consumers to consume in batch优化 RabbitMQ 消费者批量消费
【发布时间】:2018-12-16 08:55:47
【问题描述】:

我有一个应用程序,在每次消息消费时,我都需要查询 MySQL 数据库以获取一些信息,并根据该进程消费消息。我想对此进行优化,以防止数据库上的多个查询增加负载。

我正在考虑一种方法,我至少等待 x 条消息y 秒。这样我可以批量消费一些消息,即使在某些时候我收到的消息较少,它们也会被消费。

示例:假设 x = 100y = 10 秒

这意味着我等待至少 100 条消息或 10 秒,以先到者为准。这样,我可以一次查询数据库中的 100 条消息。此外,如果我收到的消息少于 100 条,剩余的消息将在最多 10 秒的窗口内处理。

我正在使用带有 amqplib 的 NodeJS 进行消费。我有以下基于 RabbitMQ 示例的代码:

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    ch.consume(q, function(msg) {
      console.log(" [x] Received %s", msg.content.toString());
    }, {noAck: true});
  });
});

我正在考虑拥有一个全局对象,并在每个 consume 回调中添加它,并在该对象到达被处理的 x 个消息 时检查该对象的计数。不过,不确定如何为此添加 y 秒 的时间上限,并确保如果我在时间窗口内收到少于 x 条消息,这些消息会得到处理

【问题讨论】:

    标签: node.js rabbitmq amqp node-amqp node-amqplib


    【解决方案1】:

    以下代码将在每条接收到的消息之后调用一个函数,该函数将接收到的消息聚合到一个数组中。当它在没有消息的情况下被调用(带有参数null)或者当它看到消息计数已达到x时,它会将聚合的消息发送到数据库函数。否则,它只是将消息添加到数组中(在if 语句的第二部分)。

    参数null 由一个在y 秒后触发的计时器传递给聚合函数。此计时器在消息队列刚刚初始化时首先设置,并在聚合器将消息发送到数据库时重置。

    var messageStore = [];
    var timer;
    
    sendToDatabase = function(messages) {...}
    
    aggregate = function(msg) {
        if (msg == null || messageStore.push(msg) == x) {
            clearTimeout(timer);
            timer = setTimeout(aggregate, 1000*y, null);
            sendToDatabase(messageStore);
            messageStore = [];
        }
    }
    
    amqp.connect('amqp://localhost', function(err, conn) {
      conn.createChannel(function(err, ch) {
        var q = 'hello';
    
        ch.assertQueue(q, {durable: false});
        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
        timer = setTimeout(aggregate, 1000*y, null);
        ch.consume(q, function(msg) {
          console.log(" [x] Received %s", msg.content.toString());
          aggregate(msg);
        }, {noAck: true});
      });
    });
    

    注意:我无法对此进行测试,因为我手头没有消息传递系统。

    【讨论】:

    • 这似乎不是一个好方法,因为您将它保存在全局内存中。并且直到每条消息都保存到 DB 并且您确认该消息之前,不会再消耗任何消息来阻塞 IO。这使得整个流程非常缓慢。