【问题标题】:rxjs - stop observable with a subject subscriptionrxjs - 通过主题订阅停止可观察
【发布时间】:2019-05-11 22:24:43
【问题描述】:

我创建了一个 Observable,它可能生成无限量的数据(例如一个计时器)。这些数据是通过一个主题访问的,因此多个观察者会收到相同的值。

如何阻止 Observable 生成新值? (不修改 Observable 的实现)

// custom Observable, to visualize internal behavior
const timer$ = new rxjs.Observable(subscriber => {
  console.log("observable init");

  var counter = 0;
  const intervalId = setInterval(() => {
    ++counter;
    console.log("observable %s",counter);
    subscriber.next(counter);
  }, 1000);

  return () => {
    console.log("observable teardown");
    clearTimeout(intervalId);
  }
});

// subscribe through a subject
const subject$ = new rxjs.Subject();
timer$.subscribe(subject$);

const subscription = subject$.subscribe(value => console.log("observer %s", value));

// cancel subscription
setTimeout(() => {
  console.log("unsubscribe observer");
  subscription.unsubscribe();
  // TODO how to stop Observable generating new values?
}, 3000);

jsfiddle:https://jsfiddle.net/gy4tfd5w/

【问题讨论】:

    标签: rxjs observable subject unsubscribe


    【解决方案1】:

    幸运的是,在 RxJS 中有一种专用且优雅的方法来解决这个问题。

    你的要求是有

    多个观察者 [...] 接收相同的值

    这被称为 multicast observable,并且有某些 operators 用于从普通的“冷” observable 创建一个。

    例如,不用直接创建Subject 的实例,您可以将可观察的源通过管道传递给share 运算符,它将为您创建Subjectshare 的文档内容如下:

    返回一个多播(共享)原始 Observable 的新 Observable。只要至少有一个订阅者,这个 Observable 就会被订阅并发送数据。当所有订阅者都取消订阅后,它将取消订阅源 Observable。

    最后一句话展示了sharesource$.subscribe(subject) 之间的细微差别。使用share 会保留一个所谓的refCount,当没有订阅者时,它会自动从其源中取消订阅Subject

    应用到你的代码,它看起来像这样:

    const timer$ = 
        new rxjs.Observable(subscriber => {// your unchanged implementation})
        .pipe(
            share()
        );
    
    const subscription = timer$.subscribe(value => console.log("observer %s", value));
    

    这是您的示例代码的完整版本:

    https://jsfiddle.net/50q746ad/

    顺便说一句,share 并不是唯一执行多播的运营商。 There are great learning resources 更深入地探讨了该主题。

    【讨论】:

      【解决方案2】:

      所以经过一些惩罚性研究后,我为这个问题添加了自己的 npm 库。

      Improves previous answer by NOT having to add any extra convolution variables and ease of use.

      【讨论】:

        【解决方案3】:

        这是我阻止疯狂可观察中间流的解决方案。

        通过使用信号 Subject 和 rxjs 运算符:takeUntil

        示例

        const stopSignal$ = new Subject();
        
        infinitelyGenerating$
          .pipe(takeUntil(stopSignal$))
          .subscribe(val => {
            if (val === 'bad') {
              stopSignal$.next()
            }
          })
        

        【讨论】:

          猜你喜欢
          • 2018-06-23
          • 2017-04-13
          • 1970-01-01
          • 2018-04-29
          • 2018-07-28
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2022-10-17
          相关资源
          最近更新 更多