【问题标题】:Pass large array to node child process将大数组传递给节点子进程
【发布时间】:2017-10-18 13:44:24
【问题描述】:

我想要在大型阵列上执行复杂的 CPU 密集型工作。理想情况下,我想将此传递给子进程。

var spawn = require('child_process').spawn;

// dataAsNumbers is a large 2D array
var child = spawn(process.execPath, ['/child_process_scripts/getStatistics', dataAsNumbers]);

child.stdout.on('data', function(data){
  console.log('from child: ', data.toString());
});

但是当我这样做时,节点会给出错误:

生成 E2BIG

我遇到了this article

因此,将数据管道传输到子进程似乎是要走的路。我的代码现在是:

var spawn = require('child_process').spawn;

console.log('creating child........................');

var options = { stdio: [null, null, null, 'pipe'] };
var args = [ '/getStatistics' ];
var child = spawn(process.execPath, args, options);

var pipe = child.stdio[3];

pipe.write(Buffer('awesome'));

child.stdout.on('data', function(data){
  console.log('from child: ', data.toString());
});

然后在 getStatistics.js 中:

console.log('im inside child');

process.stdin.on('data', function(data) {
  console.log('data is ', data);
  process.exit(0);
});

但是没有达到process.stdin.on 中的回调。如何在我的子脚本中接收流?

编辑

我不得不放弃缓冲方法。现在我将数组作为消息发送:

var cp = require('child_process');
var child = cp.fork('/getStatistics.js');

child.send({ 
  dataAsNumbers: dataAsNumbers
});

但这只有在 dataAsNumbers 的长度低于 20,000 左右时才有效,否则会超时。

【问题讨论】:

  • node 不是此类工作的正确工具。我宁愿推荐你使用多线程语言。
  • 项目已完成 90%,我现在不会从节点更改。有很多文章解释了 node 的高 CPU 使用率
  • 通常最好先着手解决核心问题的项目。在多线程语言中,您不需要复制数据,因为线程共享内存。在这种情况下复制数据会减慢一切。除此之外,当您将工作委托给 libuv 时,节点速度很快。如果您打算使用节点的 v8 部分进行繁重的处理,那么它不会很快。另外,如果出于任何原因这是实际服务器的一部分,您的事件循环将阻塞并且 I/O 将导致您的所有请求超时。
  • 我很感激,但有一些方法可以解决这个问题,例如neilk.net/blog/2013/04/30/…
  • 你为什么不把它分块发送@Mark?

标签: javascript node.js mongodb


【解决方案1】:

对于长流程任务,您可以使用 gearman 之类的东西您可以对工人执行繁重的工作流程,这样您可以设置需要多少工人,例如,我以这种方式进行一些文件处理,如果我需要扩展你创建更多的worker实例,我也有不同的workers来完成不同的任务,处理zip文件,生成缩略图等,这样做的好处是workers可以用任何语言node.js,Java,python编写并且可以集成轻松完成您的项目

// worker-unzip.js
const debug = require('debug')('worker:unzip');
const {series, apply} = require('async');
const gearman = require('gearmanode');
const {mkdirpSync} = require('fs-extra');
const extract = require('extract-zip');

module.exports.unzip = unzip;
module.exports.worker = worker;

function unzip(inputPath, outputDirPath, done) {
  debug('unzipping', inputPath, 'to', outputDirPath);
  mkdirpSync(outputDirPath);
  extract(inputPath, {dir: outputDirPath}, done);
}


/**
 *
 * @param {Job} job
 */
function workerUnzip(job) {
  const {inputPath, outputDirPath} = JSON.parse(job.payload);
  series([
    apply(unzip, inputPath, outputDirPath),
    (done) => job.workComplete(outputDirPath)
  ], (err) => {
    if (err) {
      console.error(err);
      job.reportError();
    }
  });
}

function worker(config) {
  const worker = gearman.worker(config);
  if (config.id) {
    worker.setWorkerId(config.id);
  }

  worker.addFunction('unzip', workerUnzip, {timeout: 10, toStringEncoding: 'ascii'});
  worker.on('error', (err) => console.error(err));

  return worker;
}

一个简单的 index.js

const unzip = require('./worker-unzip').worker;

unzip(config); // pass host and port of the Gearman server

我通常用 PM2 运行工人

与您的代码集成非常容易。像

//initialize
const gearman = require('gearmanode');

gearman.Client.logger.transports.console.level = 'error';
const client = gearman.client(configGearman); // same host and port

只是将工作添加到传递函数名称的队列中

const taskpayload = {inputPath: '/tmp/sample-file.zip', outputDirPath: '/tmp/unzip/sample-file/'}
const job client.submitJob('unzip', JSON.stringify(taskpayload));
job.on('complete', jobCompleteCallback);
job.on('error', jobErrorCallback);

【讨论】:

    【解决方案2】:

    我也能够重现您遇到的延迟,但可能没有您那么糟糕。我用了以下

    // main.js
    const fork = require('child_process').fork
    
    const child = fork('./getStats.js')
    
    const dataAsNumbers = Array(100000).fill(0).map(() =>
      Array(100).fill(0).map(() => Math.round(Math.random() * 100)))
    
    child.send({
      dataAsNumbers: dataAsNumbers,
    })
    

    还有

    // getStats.js
    process.on('message', function (data) {
      console.log('data is ', data)
      process.exit(0)
    })
    

    node main.js 2.72s user 0.45s system 103% cpu 3.045 total

    我正在生成由 100 个数字组成的 100k 个元素来模拟您的数据,请确保您在 process 上使用 message 事件。但也许你的孩子更复杂,可能是失败的原因,也取决于你在查询中设置的超时时间。


    如果你想得到更好的结果,你可以做的就是将你的数据分成多个片段,这些片段将被发送到子进程并重新构建以形成初始数组。


    还有一种可能是使用第三方库或协议,即使工作量更大。您可以查看messenger.js 甚至类似 AMQP 队列的东西,它可以让您在两个进程之间通过一个池进行通信,并保证子进程已确认消息。它有一些节点实现,例如amqp.node,但仍需要一些设置和配置工作。

    【讨论】:

    【解决方案3】:

    有了如此大量的数据,我会考虑使用shared memory,而不是将数据复制到子进程中(当您使用管道或传递消息时会发生这种情况)。这将节省内存,减少父进程的 CPU 时间,并且不太可能遇到一些限制。

    shm-typed-array 是一个非常简单的模块,似乎适合您的应用程序。示例:

    parent.js

    "use strict";
    
    const shm = require('shm-typed-array');
    const fork = require('child_process').fork;
    
    // Create shared memory
    const SIZE = 20000000;
    const data = shm.create(SIZE, 'Float64Array');
    
    // Fill with dummy data
    Array.prototype.fill.call(data, 1);
    
    // Spawn child, set up communication, and give shared memory
    const child = fork("child.js");
    child.on('message', sum => {
        console.log(`Got answer: ${sum}`);
    
        // Demo only; ideally you'd re-use the same child
        child.kill();
    });
    child.send(data.key);
    

    child.js

    "use strict";
    
    const shm = require('shm-typed-array');
    
    process.on('message', key => {
        // Get access to shared memory
        const data = shm.get(key, 'Float64Array');
    
        // Perform processing
        const sum = Array.prototype.reduce.call(data, (a, b) => a + b, 0);
    
        // Return processed data
        process.send(sum);
    });
    

    请注意,我们只是通过 IPC 从父进程向子进程发送一个小“密钥”,而不是整个数据。因此,我们节省了大量的内存和时间。

    当然,您可以将'Float64Array'(例如double)更改为您的应用程序需要的任何typed array。请注意,这个库特别只处理一维类型数组;但这应该只是一个小障碍。

    【讨论】:

      【解决方案4】:

      使用像https://github.com/ptarjan/node-cache这样的内存缓存,并让父进程使用某个键存储数组内容,子进程将通过该键检索内容。

      【讨论】:

        【解决方案5】:

        您可以考虑使用操作系统管道you'll find a gist here 作为节点子应用程序的输入。

        我知道这不是您所要求的,但您可以使用cluster 模块(包含在节点中)。通过这种方式,您可以获得与机器内核一样多的实例,以加快处理速度。此外,如果您不需要在开始处理之前获得所有数据,请考虑使用流。如果要处理的数据太大,我会将其存储在一个文件中,以便在处理过程中出现任何错误时您可以重新处理。 这是一个聚类的例子。

        var cluster = require('cluster');
        var numCPUs = 4;
        
        if (cluster.isMaster) {
            for (var i = 0; i < numCPUs; i++) {
                var worker = cluster.fork();
                console.log('id', worker.id)
            }
        } else {
            doSomeWork()
        }
        
        function doSomeWork(){
            for (var i=1; i<10; i++){
                console.log(i)
            }
        }
        

        更多信息在工作人员之间发送消息question 8534462

        【讨论】:

          【解决方案6】:

          为什么要创建子流程?跨子进程发送数据在 CPU 和实时方面的成本可能比在同一进程中进行处理所节省的成本更高。

          相反,我建议您考虑在与 nodejs 主进程在同一内存中运行的工作线程中进行超高效编码。

          您可以使用 NAN 编写可以发布到工作线程的 C++ 代码,然后让该工作线程在完成后将结果和事件发布回您的 nodejs 事件循环。

          这样做的好处是您不需要额外的时间将数据发送到不同的进程,但缺点是您将为线程操作编写一些 C++ 代码,但 NAN 扩展应该占用为您处理大部分艰巨的任务。

          【讨论】:

          • 我猜他想用多核
          • 如果将其推送到工作线程,您将使用多个内核
          猜你喜欢
          • 1970-01-01
          • 2020-09-30
          • 2021-02-09
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2021-11-15
          • 2017-08-13
          • 2018-01-15
          相关资源
          最近更新 更多