【问题标题】:Rxjs - clean up observable after subscriber unsubscribesRxjs - 在订阅者取消订阅后清理 observable
【发布时间】:2019-08-09 16:41:40
【问题描述】:

我正在使用一个不支持 rxjs 的库,只是很好的回调。我正在尝试将其包装在一个可观察对象中,但当有人取消订阅时,我需要内部流自行处理。

subscribeToQuotes() {
    return new Observable(observer => {

        const stream = someLibrary.getStream();

        stream.onNewData(data => {
            observer.next(data);
        });
        stream.onComplete(() => observer.complete());

        const request = stream.begin(); 

        //I need to call request.abort() when subscriber unsubscribes
    })
}

我想这样使用它:

    const onInstrumentChanged = new Subject();

    onInstrumentChanged.subscribe(instrument => {

        this.subscribeToQuotes(instrument)
            .pipe(takeUntil(onInstrumentChanged))
            .subscribe(quotes => {
                ...
            })


    })

subscribeToQuotes() 可以被多次调用,takeUntil() 放弃之前的订阅,但是内部流继续触发。我需要在内部致电request.abort()

当主题 onInstrumentChanged 触发时,我如何调用 request.abort()

可以不将主题传递给函数吗?

【问题讨论】:

    标签: javascript rxjs


    【解决方案1】:

    好的,找到答案了。 observer.add() 允许您在订阅者退订时添加拆卸功能。所以在我的情况下这是有效的:

        return new Observable(observer => {
    
            const stream = someLibrary.getStream();        
            const request = stream.begin(); 
            observer.add(() => request.abort());
        })
    
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-07-07
      • 1970-01-01
      • 2016-12-20
      • 2017-12-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-05-25
      相关资源
      最近更新 更多