【发布时间】:2017-11-05 02:47:30
【问题描述】:
我对 rxjs 还是很陌生。我在下面调用一个函数,读取完整的流并打印读取的控制台语句,但我从未看到“订阅完成”,我不知道为什么。完成这个流需要什么?有什么明显的问题吗?
const readline$ = RxNode.fromReadLineStream(rl)
.filter((element, index, observable) => {
if (index >= range.start && index < range.stop) {
console.log(`kept line is ${JSON.stringify(element)}`);
return true;
} else {
console.log(`not keeping line ${JSON.stringify(element)}`);
return false;
}
})
.concatMap(line => Rx.Observable.fromPromise(myFunction(line)))
.do(response => console.log(JSON.stringify(response)));
readline$.subscribe(i => { console.log(`Subscribe object: ${util.inspect(i)}`); },
err => { console.error(`Subscribe error: ${util.inspect(err)}`); },
done => { console.log("Subscribe done."); // NEVER CALLED
anotherFunc(); // NEVER CALLED
}
);
【问题讨论】:
-
我不确定这个特定的 observable,但不是每个 observable 都会完成。
-
你传递给
RxNode.fromReadLineStream的节点流是什么?流本身会结束吗? -
是的,它是一个有限长度文件的 RxNode.fromReadLineStream。
-
最终使用 readline.on 关闭事件来实现几乎相同的效果。
标签: javascript filter rxjs