【问题标题】:Sequencing in RxJS results in "out of memory" errorRxJS 中的排序导致“内存不足”错误
【发布时间】:2017-03-14 22:08:26
【问题描述】:

我需要处理 100 万行记录,对其进行转换,并将它们保存到多个文件中(按小时分档;每小时 1 个文件——我正在考虑通过“过滤器”将它们拆分) .

出于某种原因,我需要严格按顺序处理这些行。意思是,如果第 #450000 行需要更长的时间来处理和保存(这是棘手的部分,因为 fs 与回调异步),处理不会跳转到 #450001...它将等到 450000 完成。代码中的随机休眠就是为了模拟那个场景。

以前(使用简单的 Promise,没有 RxJs),我会创建 N 个 promise,每行一个,将它们保存在一个数组中,并通过 reduce op 进行链接,如下所述:https://github.com/kriskowal/q

但我不想创建 100 万个 Promise 实例。所以,我研究了 ReactiveX,希望它会像“推卸责任”;意思是它不会等待,只要有事件弹出就进行处理,并且处理使用的资源(认为处理块基本上是幕后的promise)会尽快释放。

我试图用这段代码来验证:

import Rx from 'rxjs-es6/Rx';
import Q from 'q';    

let subject = new Rx.Subject();
let processEventJsons = function(observable) {
  observable.concatMap(eventJson => {
    let deferred = Q.defer();    

    setTimeout(() => {
      eventJson.procDatetime = new Date().toISOString();
      deferred.resolve(eventJson);
    }, Math.random() * 5000);    

    return Rx.Observable.fromPromise(deferred.promise)
  })
  .subscribe({
    next: enrichedEventJson => {
      console.log(JSON.stringify(enrichedEventJson));
    },
    error: err => console.error('something wrong occurred: ' + err),
    complete: () => console.log('done'),
  });
}    

processEventJsons(
  subject.filter(dataJson => dataJson.type === "interview").map(dataJson => {
    return {event: "intv", datetime: dataJson.datetime}
  })
)    

processEventJsons(
  subject.filter(dataJson => dataJson.type === "checkin").map(dataJson => {
    return {event: "chki", datetime: dataJson.datetime}
  })
)    

for (let i = 0; i < 1000000; i++) {
  if (Math.random() < 0.5) {
    subject.next({id: i, type: "interview", datetime: new Date().toISOString()});
  } else {
    subject.next({id: i, type: "checkin", datetime: new Date().toISOString()});
  }
}
subject.complete();

但是...我一直得到:

致命错误:CALL_AND_RETRY_LAST 分配失败 - JavaScript 堆内存不足。

console.log(JSON.stringify(enrichedEventJson));在“for-loop”(代码末尾)完成之前不会打印。

这让我觉得切换到 RxJS 并没有真正改善这种情况;它仍然在幕后排队承诺。

还是我错误地使用了 API?你能帮我指出什么问题吗?

更新更新:

假标志。发现问题不在于使用 RxJS,而在于 for 循环(太紧了)。所以我把它改成:

for (let i = 0; i < 1000000; i++) {
  if (Math.random() < 0.5) {
    setTimeout(() => {
      subject.next({id: i, type: "interview", datetime: new Date().toISOString()});
    });
  } else {
    setTimeout(() => {
      subject.next({id: i, type: "checkin", datetime: new Date().toISOString()});
    });
  }
}

【问题讨论】:

    标签: javascript node.js promise rxjs reactive-programming


    【解决方案1】:

    我会创建 N 个 promise,每行一个,将它们保存在一个数组中,并通过 reduce op 进行链接

    这是一种简单但需要大量内存的方法。它使用了一百万个需要同时存在的 Promise。相反,您可以使用递归方法在常量内存中按顺序处理行:

    function getInput(i) {
      return {id: i, type: Math.random() < 0.5 ? "interview" : "checkin", datetime: new Date().toISOString()};
    }
    function process(eventJson) {
      let deferred = Q.defer();    
      setTimeout(() => {
        eventJson.procDatetime = new Date().toISOString();
        deferred.resolve(eventJson);
      }, Math.random() * 5000);    
      return deferred.promise;
    }
    function filteredProcess({type, datetime}) {
      if (type === "interview")
        return process({event: "intv", datetime});
      if (type === "checkin")
        return process({event: "chki", datetime});
    }
    function log(enrichedEventJson) {
      console.log(JSON.stringify(enrichedEventJson));
    }
    
    function loop(i) {
      if (i < 1000000)
        return getInput(i)
        .then(filteredProcess)
        .then(log)
        .then(() => loop(i+1));
      else
        return Q("done")
    }
    
    loop().then(console.log, err => console.error('something wrong occurred: ' + err));
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-05-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-03-26
      • 1970-01-01
      • 2014-01-25
      • 2016-09-27
      相关资源
      最近更新 更多