【问题标题】:Does an observable keep emitting values after observer has unsubscribed?观察者退订后,可观察对象是否继续发出值?
【发布时间】:2020-04-08 05:47:42
【问题描述】:

让我们考虑以下示例代码:

gude() {
  const digestor$ = new Observable(subscriber => {
    for (let i = 0; i < 4711; i++) {
      setTimeout(() => {
        const hash = createHashWithNLeadingZeroes(i);
        subscriber.next(hash);
      }, i);
    }
  });

  const subscription = digestor$.subscribe(
    _ => {
      if (subscription) {
        subscription.unsubscribe();
      }
    }
  );
}

在函数 gude() 中创建了一个新的 observable,它发出散列值,其中前 n 个前导值设置为零。观察者订阅该可观察对象并立即取消订阅。让我们假设函数 createHashWithNLeadingZeroes() 需要相当长的时间来生成响应。

恕我直言,这里正在发生以下情况:

(1) 创建一个新的 Observable,描述 Observable 行为的函数内部存储在属性 _subscribe (https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts: 37-41)。

(2) 当调用subscribe() 时,首先将Observer 包裹在一个Subscriber 对象中,然后应用Subscriber到保存 Observable 逻辑的 _subscribe 函数。 _subscribe() 快速返回,因为只设置了 4711 个超时并返回一个订阅对象 (https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts: 206-250)。

订阅者基本上拦截对 next()error()complete() 的调用,并且仅在以下情况下转发给实际观察者内部属性 isStopped 未设置 (https://github.com/ReactiveX/rxjs/blob/master/src/internal/Subscriber.ts: 90-128)。

(3) 一旦设置了变量subscription,就会调用unsubscribe()。其中,这将导致将 isStopped 设置为 true,以便订阅者不再将哈希转发给观察者 (https://github.com/ReactiveX/rxjs/blob/master/src/internal/Subscriber.ts: 130-136)。

根据该逻辑,Observable 仍将继续执行其工作,直到所有 4711 哈希都被无目的计算,因为观察者的方法变成了 noops。最终,这种行为可能会影响应用程序的性能,具体取决于订阅量和 Observable 的工作负载。我觉得有点难以相信所描述的内容是正确的。我在这里遗漏了哪一部分?

【问题讨论】:

    标签: angular rxjs observable


    【解决方案1】:

    您正在创建一个"hot" Observable,因此即使没有订阅者,它也会发出。

    使用new Observable() 创建 Observables 时,您可以选择返回所谓的拆卸(或处置)函数,该函数应在需要时清理任何资源。

    所以在你的情况下,你会想要停止计时器。

    new Observable(subscriber => {
      ...
      const handler = setTimeout(() => {...});
    
      return () => clearTimeout(handler);
    });
    

    或者,如果您有多个计时器,您可以在所有计时器上调用 clearTimeout

    【讨论】:

      【解决方案2】:

      我在这里遗漏了哪一部分?

      我认为您缺少的部分是 observable 有责任尊重合同:如果要求它停止发射,它应该停止发射。因此,您的 observable 应该执行以下操作:

      const digestor$ = new Observable(subscriber => {
        let keepGoing = true;
        for (let i = 0; i < 4711 && keepGoing; i++) {
          setTimeout(() => {
            if (keepGoing) {
              const hash = createHashWithNLeadingZeroes(i);
              subscriber.next(hash);
            }
          }, i);
      
          return () => keepGoing = false; // this function is called when the subscriber unsubscribes
        }
      });
      

      依靠现有的工厂函数和操作符来实现所需的行为通常是一个更好的主意。例如,您可以通过使用range()timer()map() 来执行与上述 observable 等效的操作。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2011-06-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-07-09
        相关资源
        最近更新 更多