对于像这样的分布式问题,我使用了 zmq,它运行得非常好。我会给你一个我遇到的类似问题,并尝试通过进程解决(但失败了。)然后转向 zmq。
使用 bcrypt 或昂贵的散列算法是明智的,但它会阻塞节点进程大约 0.5 秒。我们不得不把它卸载到不同的服务器上,作为一个快速修复,我基本上使用了你所做的。运行一个子进程并向它发送消息并将其发送到
回应。我们发现的唯一问题是,无论出于何种原因,我们的子进程都会在它完全没有工作时固定整个核心。(我仍然没有弄清楚为什么会发生这种情况,我们运行了一个跟踪,似乎 epoll 在 stdout 上失败了/stdin 流。它也只会发生在我们的 Linux 机器上,并且在 OSX 上可以正常工作。)
编辑:
内核的固定在https://github.com/joyent/libuv/commit/12210fe 中已修复并与https://github.com/joyent/node/issues/5504 有关,所以如果您遇到问题并且您使用的是centos + kernel v2.6.32:更新节点,或更新您的内核!
不管 child_process.fork() 有什么问题,这是我一直使用的一个漂亮的模式
客户:
var child_process = require('child_process');
function FileParser() {
this.__callbackById = [];
this.__callbackIdIncrement = 0;
this.__process = child_process.fork('./child');
this.__process.on('message', this.handleMessage.bind(this));
}
FileParser.prototype.handleMessage = function handleMessage(message) {
var error = message.error;
var result = message.result;
var callbackId = message.callbackId;
var callback = this.__callbackById[callbackId];
if (! callback) {
return;
}
callback(error, result);
delete this.__callbackById[callbackId];
};
FileParser.prototype.parse = function parse(data, callback) {
this.__callbackIdIncrement = (this.__callbackIdIncrement + 1) % 10000000;
this.__callbackById[this.__callbackIdIncrement] = callback;
this.__process.send({
data: data, // optionally you could pass in the path of the file, and open it in the child process.
callbackId: this.__callbackIdIncrement
});
};
module.exports = FileParser;
子进程:
process.on('message', function(message) {
var callbackId = message.callbackId;
var data = message.data;
function respond(error, response) {
process.send({
callbackId: callbackId,
error: error,
result: response
});
}
// parse data..
respond(undefined, "computed data");
});
我们还需要一个模式来同步不同的进程,当每个进程完成它的任务时,它会响应我们,我们会为每个完成的进程增加一个计数,然后当我们调用 Semaphore 的回调已经达到了我们想要的数量。
function Semaphore(wait, callback) {
this.callback = callback;
this.wait = wait;
this.counted = 0;
}
Semaphore.prototype.signal = function signal() {
this.counted++;
if (this.counted >= this.wait) {
this.callback();
}
}
module.exports = Semaphore;
这是一个将上述所有模式联系在一起的用例:
var FileParser = require('./FileParser');
var Semaphore = require('./Semaphore');
var arrFileParsers = [];
for(var i = 0; i < require('os').cpus().length; i++){
var fileParser = new FileParser();
arrFileParsers.push(fileParser);
}
function getFiles() {
return ["file", "file"];
}
var arrResults = [];
function onAllFilesParsed() {
console.log('all results completed', JSON.stringify(arrResults));
}
var lock = new Semaphore(arrFileParsers.length, onAllFilesParsed);
arrFileParsers.forEach(function(fileParser) {
var arrFiles = getFiles(); // you need to decide how to split the files into 1k chunks
fileParser.parse(arrFiles, function (error, result) {
arrResults.push(result);
lock.signal();
});
});
最终我使用了http://zguide.zeromq.org/page:all#The-Load-Balancing-Pattern,其中客户端使用的是 nodejs zmq 客户端,worker/broker 是用 C 编写的。这使我们能够跨多台机器扩展它,而不仅仅是具有子进程的本地机器。