【问题标题】:RxJS subscribe never finishesRxJS 订阅永远不会结束
【发布时间】:2017-11-05 02:47:30
【问题描述】:

我对 rxjs 还是很陌生。我在下面调用一个函数,读取完整的流并打印读取的控制台语句,但我从未看到“订阅完成”,我不知道为什么。完成这个流需要什么?有什么明显的问题吗?

const readline$ = RxNode.fromReadLineStream(rl)
    .filter((element, index, observable) => {
        if (index >= range.start && index < range.stop) {
            console.log(`kept line is ${JSON.stringify(element)}`);
            return true;
        } else {
            console.log(`not keeping line ${JSON.stringify(element)}`);
            return false;
        }
    })
    .concatMap(line => Rx.Observable.fromPromise(myFunction(line)))
    .do(response => console.log(JSON.stringify(response)));

readline$.subscribe(i => { console.log(`Subscribe object: ${util.inspect(i)}`); },
                  err => { console.error(`Subscribe error: ${util.inspect(err)}`); },
                 done => { console.log("Subscribe done."); // NEVER CALLED
                           anotherFunc();                  // NEVER CALLED
                     }
);

【问题讨论】:

  • 我不确定这个特定的 observable,但不是每个 observable 都会完成。
  • 你传递给RxNode.fromReadLineStream的节点流是什么?流本身会结束吗?
  • 是的,它是一个有限长度文件的 RxNode.fromReadLineStream。
  • 最终使用 readline.on 关闭事件来实现几乎相同的效果。

标签: javascript filter rxjs


【解决方案1】:

从源代码中可以看出,只有在源流发出close 事件时才会发送完成通知。 https://github.com/Reactive-Extensions/rx-node/blob/master/index.js#L100-L102

因此,如果您需要调用正确的完整处理程序,则需要自己关闭流,请参阅How to close a readable stream (before end)?
换句话说,Observable 在读取整个文件后不会自动完成。

【讨论】:

    猜你喜欢
    • 2021-12-22
    • 2018-03-19
    • 1970-01-01
    • 2021-05-17
    • 2013-04-09
    • 2010-12-27
    • 1970-01-01
    • 2021-08-05
    • 2011-01-20
    相关资源
    最近更新 更多