【问题标题】:Rxjs refCount callback to cleanup once every subscribers have unsubscribed?一旦每个订阅者都取消订阅,Rxjs refCount 回调清理?
【发布时间】:2018-10-24 08:56:26
【问题描述】:

我有一个寿命不长的可观察对象(http 请求)。

我正在使用publishReplay(1)refCount(),这样当尝试同时访问它时,它会返回相同的值而不会再次触发 http 调用。

但如果所有订阅都取消订阅,我需要进行一些清理。

我不能使用finalize,因为:

  • 如果我在publishReplay 之前使用它,那么一旦 http 请求完成,它就会关闭
  • 如果我在refCount 之后使用它,它将在一个可观察到的取消订阅后立即运行(而不是在全部取消订阅时)

所以基本上我想要的是将回调传递给refCount 并在订阅数量达到0 时调用该回调。但它不是那样工作的。当所有订阅者都退订时,有什么方法可以“警告”?

我现在能想到的最简单的方法是创建一个自定义运算符,它几乎可以扩展 refCount 以添加回调。

有更好的想法吗?我很确定有更好的方法来做到这一点。

谢谢!

【问题讨论】:

  • 为什么不把它放在publishReplayrefCount之间呢?如果ConnectableObservable 实现liftfinalize 将返回ConnectableObservable - 允许使用refCount。这无法使用 TypeScript 表示,但 lift 就是这样做的。
  • 哦,真的很酷,我以为我这样做有误,但事实证明我没有!感谢@cartant 的解释,如果你有 100 万人重写它作为答案:)

标签: rxjs observable reactive-programming


【解决方案1】:

我已经尝试过了,它似乎可以正常工作。

它的使用方法如下:

import { of, timer } from 'rxjs'; 
import { map, publishReplay, take } from 'rxjs/operators';
import { refCountCb } from './refCountCb';

const source = timer(2000, 10000).pipe(
  map(x => `Hello ${x}!`),
  publishReplay(1),
  refCountCb(() => console.log('MAIN CLOSED'))
);

source.pipe(take(1)).subscribe(x => console.log(x));
source.pipe(take(1)).subscribe(x => console.log(x));

输出:

Hello 0!
Hello 0!
MAIN CLOSED

我基于refCountsource 构建了自定义refCountCb 运算符。它基本上只是添加一个回调,所以我不会在这里复制粘贴整个代码,但它可以在 stackblitz 上找到。

完整演示:https://stackblitz.com/edit/rxjs-h7dbfc?file=index.ts

如果您有任何其他想法,请分享,我很高兴发现不同的解决方案!

【讨论】:

    【解决方案2】:

    我不会撒谎,我很高兴发现我不是唯一一个在寻找类似东西的人。还有一个人。

    我最终做了这样的事情:

    import { Observable } from 'rxjs';
    
    export function tapTeardown(teardownLogic: () => void) {
      return <T>(source: Observable<T>): Observable<T> =>
        new Observable<T>((observer) => {
          const subscription = source.subscribe(observer);
    
          return () => {
            subscription.unsubscribe();
            teardownLogic();
          };
        });
    }
    

    你会像这样使用它:

        const augmented = connection.pipe(
          tapTeardown(() => /* SOME TEARDOWN LOGIC */),
          shareReplay({ bufferSize: 1, refCount: true }),
        );
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-26
      相关资源
      最近更新 更多