【发布时间】:2019-08-09 16:41:40
【问题描述】:
我正在使用一个不支持 rxjs 的库,只是很好的回调。我正在尝试将其包装在一个可观察对象中,但当有人取消订阅时,我需要内部流自行处理。
subscribeToQuotes() {
return new Observable(observer => {
const stream = someLibrary.getStream();
stream.onNewData(data => {
observer.next(data);
});
stream.onComplete(() => observer.complete());
const request = stream.begin();
//I need to call request.abort() when subscriber unsubscribes
})
}
我想这样使用它:
const onInstrumentChanged = new Subject();
onInstrumentChanged.subscribe(instrument => {
this.subscribeToQuotes(instrument)
.pipe(takeUntil(onInstrumentChanged))
.subscribe(quotes => {
...
})
})
subscribeToQuotes() 可以被多次调用,takeUntil() 放弃之前的订阅,但是内部流继续触发。我需要在内部致电request.abort()。
当主题 onInstrumentChanged 触发时,我如何调用 request.abort()?
可以不将主题传递给函数吗?
【问题讨论】:
标签: javascript rxjs