【问题标题】:rxjs how do I observe a collection of observables that are not completedrxjs 如何观察未完成的可观察对象集合
【发布时间】:2020-11-11 05:45:53
【问题描述】:

我有一堆 observables (typescript Observable<boolean>),每个都发出多次然后完成,我想在它们完成时将它们从集合中删除,但当它们仍然处于活动状态时,我需要知道它们的最新值.如果集合没有改变,我会有这样的东西:

function all(inputs: Observable<boolean>[]) {
  return combineLatest([...inputs]).pipe(map((values) => values.every(v=>v)));
}

但我需要能够注册新的 observables 并在它们完成时从输入列表中删除它们......

class AssentService {
  all(): Observable<boolean> { /* ??? */ }

  register(input: Observable<boolean>) { /* ??? */ }
}

all 只会被调用一次(可能在任何寄存器调用之前)但register 可以被调用多次并且需要添加该输入直到它完成(最终我需要的不仅仅是all 但我我很确定我一拿到这个就可以弄清楚其余的)。

【问题讨论】:

    标签: angular rxjs


    【解决方案1】:

    这是一个自定义操作符,它使用 map 实现 combineLatest,并且每次发出任何流时都会发出当前正在运行的流的数组。

    这是一个高阶运算符,因此您可以像使用 mergeAll()concatAll() 一样使用它,它需要一个可观察的流。

    combineLatestAll(){
      return stream$ => defer(() => {
        const register = new Map<number, any>();
        return stream$.pipe(
          mergeMap((inner$: Observable<any>, index: number) => inner$.pipe(
            finalize(() => register.delete(index)),
            map(payload => {
              register.set(index, payload);
              return Array.from(register.values());
            })
          ))
        )
      })
    }
    

    您可以随心所欲地使用它,但这是一种简单(如果有限)的方法。

    class AssentService {
      private _registered$ = new Subject<Observable<boolean>>();
    
      all(): Observable<boolean[]> {
        return this._registered$.pipe(
          combineLatestAll()
        )
      }
    
      register(input: Observable<boolean>) {
        this._registered$.next(input);
      }
    }
    

    为什么会受到限制?这种方法多播register,但您只能在all() 之后的呼叫中使用combineLatestAll()。因此,如果消费者首先调用all(),然后再注册,这将按照我假设您期望的方式工作。

    您可以改为缓存最新结果并在所有订阅者之间共享。在这种情况下,init() 是在您的服务启动时运行的任何代码块。 (在您第一次致电allregister 之前)

    class AssentService {
      private _registered$ = new Subject<Observable<boolean>>();
      private _cashed$ = _registered$.pipe(
        combineLatestAll(),
        shareReplay(1)
      );
    
      /*** init(){ ***/
        this._cashed$.subscibe();
      /*** } ***/    
    
      all(): Observable<boolean[]> {
        return this.cashed$;
      }
    
      register(input: Observable<boolean>) {
        this._registered$.next(input);
      }
    }
    

    【讨论】:

    • finalize 是我一直在寻找但不知道如何找到的东西。谢谢。
    【解决方案2】:

    您可以使用mergeAll() 将高阶 Observable 合并到链中:

    subject
      .pipe(
        mergeAll(),
      )
      .subscribe({
        next: console.log,
        complete: () => console.log('completed')
      });
    
    ...
    
    subject.next(timer(1000));
    subject.next(timer(1000));
    subject.complete();
    

    只有在所有嵌套的 Observables 完成并且源 subject 完成时,才会调用完整的处理程序。您不需要删除已完成的 Observable,因为它们会自动取消订阅。

    现场演示:https://stackblitz.com/edit/rxjs-ldjj83?file=index.ts

    【讨论】:

    • 完成后我需要删除它们的原因是因为我只需要关注“生活”主题。用可能与当今世界相关的术语来说,我传递给register 的每个主题都是选民,并且可以在我召集选举时投票,但如果他们当时死了,我不在乎他们的投票。跨度>
    猜你喜欢
    • 2016-06-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多