【发布时间】:2017-03-14 22:08:26
【问题描述】:
我需要处理 100 万行记录,对其进行转换,并将它们保存到多个文件中(按小时分档;每小时 1 个文件——我正在考虑通过“过滤器”将它们拆分) .
出于某种原因,我需要严格按顺序处理这些行。意思是,如果第 #450000 行需要更长的时间来处理和保存(这是棘手的部分,因为 fs 与回调异步),处理不会跳转到 #450001...它将等到 450000 完成。代码中的随机休眠就是为了模拟那个场景。
以前(使用简单的 Promise,没有 RxJs),我会创建 N 个 promise,每行一个,将它们保存在一个数组中,并通过 reduce op 进行链接,如下所述:https://github.com/kriskowal/q
但我不想创建 100 万个 Promise 实例。所以,我研究了 ReactiveX,希望它会像“推卸责任”;意思是它不会等待,只要有事件弹出就进行处理,并且处理使用的资源(认为处理块基本上是幕后的promise)会尽快释放。
我试图用这段代码来验证:
import Rx from 'rxjs-es6/Rx';
import Q from 'q';
let subject = new Rx.Subject();
let processEventJsons = function(observable) {
observable.concatMap(eventJson => {
let deferred = Q.defer();
setTimeout(() => {
eventJson.procDatetime = new Date().toISOString();
deferred.resolve(eventJson);
}, Math.random() * 5000);
return Rx.Observable.fromPromise(deferred.promise)
})
.subscribe({
next: enrichedEventJson => {
console.log(JSON.stringify(enrichedEventJson));
},
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
}
processEventJsons(
subject.filter(dataJson => dataJson.type === "interview").map(dataJson => {
return {event: "intv", datetime: dataJson.datetime}
})
)
processEventJsons(
subject.filter(dataJson => dataJson.type === "checkin").map(dataJson => {
return {event: "chki", datetime: dataJson.datetime}
})
)
for (let i = 0; i < 1000000; i++) {
if (Math.random() < 0.5) {
subject.next({id: i, type: "interview", datetime: new Date().toISOString()});
} else {
subject.next({id: i, type: "checkin", datetime: new Date().toISOString()});
}
}
subject.complete();
但是...我一直得到:
致命错误:CALL_AND_RETRY_LAST 分配失败 - JavaScript 堆内存不足。
console.log(JSON.stringify(enrichedEventJson));在“for-loop”(代码末尾)完成之前不会打印。
这让我觉得切换到 RxJS 并没有真正改善这种情况;它仍然在幕后排队承诺。
还是我错误地使用了 API?你能帮我指出什么问题吗?
更新更新:
假标志。发现问题不在于使用 RxJS,而在于 for 循环(太紧了)。所以我把它改成:
for (let i = 0; i < 1000000; i++) {
if (Math.random() < 0.5) {
setTimeout(() => {
subject.next({id: i, type: "interview", datetime: new Date().toISOString()});
});
} else {
setTimeout(() => {
subject.next({id: i, type: "checkin", datetime: new Date().toISOString()});
});
}
}
【问题讨论】:
标签: javascript node.js promise rxjs reactive-programming