【问题标题】:can I limit consumption of kafka-node consumer?我可以限制 kafka-node 消费者的消费吗?
【发布时间】:2026-02-08 14:10:02
【问题描述】:

好像我的kafka节点消费者:

var kafka = require('kafka-node');
var consumer = new Consumer(client, [], {
     ...
    });

获取的消息太多,在某些情况下我无法处理。 有没有办法限制它(例如每秒接受不超过 1000 条消息,可能使用 pause api?)

  • 我使用的是 kafka-node,与 Java 版本相比,它的 api 似乎有限

【问题讨论】:

    标签: node.js apache-kafka


    【解决方案1】:

    在 Kafka 中,轮询和处理应该以协调/同步的方式发生。即,在每次轮询之后,您应该首先处理所有接收到的数据,然后再进行下一次轮询。此模式会自动将消息数量限制为您的客户端可以处理的最大吞吐量。

    类似这样的东西(伪代码):

    while(isRunning) {
      messages = poll(...)
      for(m : messages) {
        process(m);
      }
    }
    

    (这就是为什么没有参数“fetch.max.messages”的原因——你不需要它。)

    【讨论】:

    • 你知道node-kafka模块中有类似的api吗?
    • 我个人不使用node.js。此外,有多种客户端实现可用——我假设“正确的代码”取决于您使用的库:cwiki.apache.org/confluence/display/KAFKA/…
    【解决方案2】:

    我遇到过类似的情况,我正在消费来自 Kafka 的消息,并且不得不限制消费,因为我的消费者服务依赖于具有自身约束的第三方 API。

    我使用async/queue 和一个名为asyncTimedCargoasync/cargo 包装器进行批处理。 cargo 从 kafka-consumer 获取所有消息,并在达到大小限制batch_config.batch_size 或超时batch_config.batch_timeout 时将其发送到队列。 async/queue 提供 saturatedunsaturated 回调,如果您的队列任务工作人员很忙,您可以使用它们来停止消费。这将阻止货物装满,并且您的应用程序不会耗尽内存。消耗将在不饱和时恢复。

    //cargo-service.js
    module.exports = function(key){
        return new asyncTimedCargo(function(tasks, callback) {
            var length = tasks.length;
            var postBody = [];
            for(var i=0;i<length;i++){
                var message ={};
                var task = JSON.parse(tasks[i].value);
                message = task;
                postBody.push(message);
            }
            var postJson = {
                "json": {"request":postBody}
            };
            sms_queue.push(postJson);
            callback();
        }, batch_config.batch_size, batch_config.batch_timeout)
    };
    
    //kafka-consumer.js
    cargo = cargo-service()
    consumer.on('message', function (message) {
        if(message && message.value && utils.isValidJsonString(message.value)) {
            var msgObject = JSON.parse(message.value);        
            cargo.push(message);
        }
        else {
            logger.error('Invalid JSON Message');
        }
    });
    
    // sms-queue.js
    var sms_queue = queue(
    retryable({
        times: queue_config.num_retries,
        errorFilter: function (err) {
            logger.info("inside retry");
            console.log(err);
            if (err) {
                return true;
            }
            else {
                return false;
            }
        }
    }, function (task, callback) {
    // your worker task for queue
      callback()
    }), queue_config.queue_worker_threads);
    
    sms_queue.saturated = function() {
        consumer.pause();
        logger.warn('Queue saturated Consumption paused: ' + sms_queue.running());
    };
    sms_queue.unsaturated = function() {
        consumer.resume();
        logger.info('Queue unsaturated Consumption resumed: ' + sms_queue.running());
    };
    

    【讨论】:

      【解决方案3】:

      来自自述文件中的FAQ

      1. 创建一个async.queue,其中包含消息处理器和一个并发(消息处理器本身被setImmediate 函数包装,因此它不会冻结事件循环)
      2. queue.drain 设置为resume() 消费者
      3. 消费者的消息事件处理程序pause()消费者并将消息推送到队列。

      【讨论】:

      • 这很有趣,但我找不到任何实现的示例。是否有任何示例代码。我不知道如何用这个初始化消费者。
      【解决方案4】:

      据我所知,API 没有任何限制。但是两个消费者(Consumer 和 HighLevelConsumer)都有一个“pause()”功能。因此,如果您收到太多消息,您可以停止消费。也许这已经提供了你需要的东西。

      请记住正在发生的事情。您向代理发送一个获取请求并返回一批消息。您可以配置要获取的消息的最小和最大大小(根据文档而不是消息的数量):

      {
          ....
          // This is the minimum number of bytes of messages that must be available to give a response, default 1 byte 
          fetchMinBytes: 1,
      
          // The maximum bytes to include in the message set for this partition. This helps bound the size of the response. 
           fetchMaxBytes: 1024 * 1024,
       }
      

      【讨论】:

        【解决方案5】:

        我遇到了同样的问题,最初 fetchMaxBytes 的值是

        fetchMaxBytes: 1024 * 1024 * 10 // 10MB
        

        我只是把它改成

        fetchMaxBytes: 1024
        

        改了之后很顺利。

        【讨论】: