【问题标题】:Nodejs Kafka consumer with pm2 cluster带有 pm2 集群的 Nodejs Kafka 消费者
【发布时间】:2018-04-02 05:11:55
【问题描述】:

我已经实现了 Kafka 消费者应用程序,我只是想知道如果我在 pm2 集群模式下运行这个应用程序,所有内核会使用相同的消息还是不同的消息?有什么方法可以验证吗?在集群模式下运行这个应用程序是否理想?我之所以在集群模式下运行它,是因为我们的 kafka 会产生大量消息。

目前,如果我在 pm2 集群模式下运行它,我们所有的内核都达到了 100% 的 CPU 使用率。会不会是这样的?

仅供参考:我正在使用https://www.npmjs.com/package/no-kafka

【问题讨论】:

  • 请检查配置选项并按可用分区数分割您的消费者。如果您的消费者正在阅读超过 1 个特定主题的分区,您将收到重复的消息,

标签: node.js apache-kafka cluster-computing kafka-consumer-api pm2


【解决方案1】:

所有核心会使用相同的消息还是不同的消息?有什么方法可以验证吗?

这取决于您的主题配置 + 消费者配置。举个例子吧。

  • 假设我们有一个包含 3 个分区的主题。
  • 现在我们启动 1 个消费者进程,消费者组为“some_consumer_group”。有关消费者群体的详细信息,请查看此处https://www.npmjs.com/package/no-kafka#groupconsumer-new-unified-consumer-api
  • 现在您的一个消费者正在监听 3 个分区。
  • 由于 kafka 维护每个主题的偏移量,每个消费者组的每个分区,您的消费者将收到来自 3 个不同分区的 3 条消息。因此没有重复消息。
  • 现在让我们再添加一个消费者进程。
  • 现在消费者组“some_consumer_group”的消费者 1 正在监听分区 0 和 1,而消费者组“some_consumer_group”的消费者 2 正在监听分区 2。(也可能反过来)。
  • 最后,如果我们向组中添加一个消费者,现在我们让每个消费者监听 1 个分区
  • 如果是这样设置,您将不会遇到重复的消息。

目前,如果我在 pm2 集群模式下运行它,我们所有的内核都达到了 100% 的 CPU 使用率。会不会是这样的?

我不太熟悉 no-kafka 以及消息的处理方式。

但是检查一下,库是否在获取下一批消息之前等待提交发生。

如果没有,您的进程可能会为消息创建过多的处理程序。

【讨论】:

    【解决方案2】:

    基于 PM2 的集群仅适用于网络服务器,因为集群进程共享传入的网络端口并分发请求。

    在您的情况下,数据源是消息订阅,必须手动将其分发到集群的工作进程。

    所以,为了安全起见,主进程应该与数据源交互,并将消息平均分配给工作进程,这样在外部,它看起来是一个单一的消费者,但仍然可以在所有 CPU 上处理消息核心。

    以下示例演示了这样的设置,不依赖于基于 PM2 的集群:

    const cluster = require('cluster');
    const _ = require('lodash');
    const os = require('os');
    
    // dispatch index
    let dispatchIndex = 0;
    
    /**
     * Dispatches data to workers in a cyclic fashion
     * @param {*} data - data to process
     */
    function dispatch(data) {
    
        // ensure master
        if (!cluster.isMaster) {
            throw new Error('Only master can dispatch');
        }
    
        // get worker ids, sorted
        const workersIds = _.sortBy(_.keys(cluster.workers), _.identity);
    
        // ensure at least one worker is available
        if (workersIds.length < 1) {
            throw new Error('No worker process alive');
        }
    
        // select next worker
        dispatchIndex = dispatchIndex >= workersIds.length ? 0 : dispatchIndex;
        const worker = cluster.workers[workersIds[dispatchIndex]];
        dispatchIndex++;
    
        // send data to worker
        worker.send(data);
    }
    
    
    // Main Script
    if (cluster.isMaster) {
    
        // Setup master process
        console.info(`Master ${process.pid} started.`);
    
        // fork worker processes to match available CPUs
        const numCpu = os.cpus().length;
        for (let i = 0; i < numCpu; i++) {
            cluster.fork();
        }
    
        // *** Get/Subscribe data from external source and dispatch to workers ***
        setInterval(() => dispatch({ a: 'value' }), 1000);
    
    } else if (cluster.isWorker) {
    
        // Setup worker process
        console.info(`Worker ${process.pid} started.`);
    
        // *** handle dispatched data ***
        process.on('message', (data) => {
            console.info(`Data processed by ${process.pid}`);
        });
    }
    

    阅读cluster module documentation也很好。

    【讨论】:

      猜你喜欢
      • 2023-03-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-02-10
      • 1970-01-01
      • 2019-02-25
      • 2019-03-24
      相关资源
      最近更新 更多