【问题标题】:Best way to execute parallel processing in Node.js在 Node.js 中执行并行处理的最佳方式
【发布时间】:2013-11-15 15:46:33
【问题描述】:

我正在尝试编写一个小型节点应用程序,它将搜索并解析文件系统上的大量文件。 为了加快搜索速度,我们尝试使用某种 map reduce。该计划将是以下简化方案:

  • Web 请求带有搜索查询
  • 启动了 3 个进程,每个进程分配了 1000 个(不同的)文件
  • 一旦进程完成,它会将其结果“返回”回主线程
  • 一旦所有进程完成,主线程将继续返回组合结果作为 JSON 结果

我的问题是: 这在 Node 中可行吗? 推荐的做法是什么?

我一直在摆弄,但不要再使用Process 的示例:

发起者:

function Worker() { return child_process.fork("myProcess.js); }
for(var i = 0; i < require('os').cpus().length; i++){
        var process = new Worker();
        process.send(workItems.slice(i * itemsPerProcess, (i+1) * itemsPerProcess));
}

myProcess.js

process.on('message', function(msg) {
    var valuesToReturn = [];
    // Do file reading here
    //How would I return valuesToReturn?
    process.exit(0);
}

一些旁注:

  • 我知道进程数应该取决于服务器上 CPU 的数量
  • 我还知道文件系统中的速度限制。在我们将其移至数据库或 Lucene 实例之前,请将其视为概念证明 :-)

【问题讨论】:

  • 假设优化概念验证是正确使用时间,那么我不会这样做。如果有必要,我会建立一个索引并观察文件系统的变化。这样你会得到很多更好的性能。如果这只是学习工作进程的练习,那就继续吧。

标签: node.js


【解决方案1】:

应该是可行的。举个简单的例子:

// parent.js
var child_process = require('child_process');

var numchild  = require('os').cpus().length;
var done      = 0;

for (var i = 0; i < numchild; i++){
  var child = child_process.fork('./child');
  child.send((i + 1) * 1000);
  child.on('message', function(message) {
    console.log('[parent] received message from child:', message);
    done++;
    if (done === numchild) {
      console.log('[parent] received all results');
      ...
    }
  });
}

// child.js
process.on('message', function(message) {
  console.log('[child] received message from server:', message);
  setTimeout(function() {
    process.send({
      child   : process.pid,
      result  : message + 1
    });
    process.disconnect();
  }, (0.5 + Math.random()) * 5000);
});

因此,父进程生成 X 个子进程并向它们传递消息。它还安装了一个事件处理程序来侦听从孩子发回的任何消息(例如结果)。

子进程等待来自父进程的消息,然后开始处理(在这种情况下,它只是启动一个带有随机超时的计时器来模拟正在完成的一些工作)。完成后,它将结果发送回父进程并使用process.disconnect() 断开与父进程的连接(基本上停止子进程)。

父进程跟踪启动的子进程的数量,以及返回结果的子进程的数量。当这些数字相等时,父进程会收到来自子进程的所有结果,因此它可以合并所有结果并返回 JSON 结果。

【讨论】:

  • 当子进程执行process.send时,它自己的事件处理程序是否有机会捕获on('message')以及父进程的事件处理程序?
  • @BartVangeneugden no,process.send() 用于孩子与父母的交流,child.send() 用于相反的方式(从父母到孩子)。另请参阅the official documentation
【解决方案2】:

对于像这样的分布式问题,我使用了 zmq,它运行得非常好。我会给你一个我遇到的类似问题,并尝试通过进程解决(但失败了。)然后转向 zmq。

使用 bcrypt 或昂贵的散列算法是明智的,但它会阻塞节点进程大约 0.5 秒。我们不得不把它卸载到不同的服务器上,作为一个快速修复,我基本上使用了你所做的。运行一个子进程并向它发送消息并将其发送到 回应。我们发现的唯一问题是,无论出于何种原因,我们的子进程都会在它完全没有工作时固定整个核心。(我仍然没有弄清楚为什么会发生这种情况,我们运行了一个跟踪,似乎 epoll 在 stdout 上失败了/stdin 流。它也只会发生在我们的 Linux 机器上,并且在 OSX 上可以正常工作。)

编辑:

内核的固定在https://github.com/joyent/libuv/commit/12210fe 中已修复并与https://github.com/joyent/node/issues/5504 有关,所以如果您遇到问题并且您使用的是centos + kernel v2.6.32:更新节点,或更新您的内核!

不管 child_process.fork() 有什么问题,这是我一直使用的一个漂亮的模式

客户:

var child_process = require('child_process');

function FileParser() {

    this.__callbackById = [];
    this.__callbackIdIncrement = 0;
    this.__process = child_process.fork('./child');
    this.__process.on('message', this.handleMessage.bind(this));

}

FileParser.prototype.handleMessage = function handleMessage(message) {

    var error = message.error;
    var result = message.result;
    var callbackId = message.callbackId;
    var callback = this.__callbackById[callbackId];

    if (! callback) {
        return;
    }
    callback(error, result);
    delete this.__callbackById[callbackId];

};

FileParser.prototype.parse = function parse(data, callback) {

    this.__callbackIdIncrement = (this.__callbackIdIncrement + 1) % 10000000;
    this.__callbackById[this.__callbackIdIncrement] = callback;
    this.__process.send({
        data: data, // optionally you could pass in the path of the file, and open it in the child process.
        callbackId: this.__callbackIdIncrement
    });

};

module.exports = FileParser;

子进程:

process.on('message', function(message) {

    var callbackId = message.callbackId;
    var data = message.data;
    function respond(error, response) {
        process.send({
            callbackId: callbackId,
            error: error,
            result: response
        });
    }
    // parse data..
    respond(undefined, "computed data");
});

我们还需要一个模式来同步不同的进程,当每个进程完成它的任务时,它会响应我们,我们会为每个完成的进程增加一个计数,然后当我们调用 Semaphore 的回调已经达到了我们想要的数量。

function Semaphore(wait, callback) {

    this.callback = callback;
    this.wait = wait;
    this.counted = 0;

}

Semaphore.prototype.signal = function signal() {
    this.counted++;
    if (this.counted >= this.wait) {
        this.callback();
    }
}

module.exports = Semaphore;

这是一个将上述所有模式联系在一起的用例:

var FileParser = require('./FileParser');
var Semaphore  = require('./Semaphore');

var arrFileParsers = [];
for(var i = 0; i < require('os').cpus().length; i++){
    var fileParser = new FileParser();
    arrFileParsers.push(fileParser);
}

function getFiles() {
    return ["file", "file"];
}

var arrResults = [];
function onAllFilesParsed() {
    console.log('all results completed', JSON.stringify(arrResults));
}

var lock = new Semaphore(arrFileParsers.length, onAllFilesParsed);
arrFileParsers.forEach(function(fileParser) {
    var arrFiles  = getFiles(); // you need to decide how to split the files into 1k chunks
    fileParser.parse(arrFiles, function (error, result) {
        arrResults.push(result);
        lock.signal();
    });
});

最终我使用了http://zguide.zeromq.org/page:all#The-Load-Balancing-Pattern,其中客户端使用的是 nodejs zmq 客户端,worker/broker 是用 C 编写的。这使我们能够跨多台机器扩展它,而不仅仅是具有子进程的本地机器。

【讨论】:

    猜你喜欢
    • 2021-12-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-12-15
    • 2010-09-19
    • 2016-02-29
    • 1970-01-01
    • 2014-10-01
    相关资源
    最近更新 更多