这是一个自定义操作符,它使用 map 实现 combineLatest,并且每次发出任何流时都会发出当前正在运行的流的数组。
这是一个高阶运算符,因此您可以像使用 mergeAll() 或 concatAll() 一样使用它,它需要一个可观察的流。
combineLatestAll(){
return stream$ => defer(() => {
const register = new Map<number, any>();
return stream$.pipe(
mergeMap((inner$: Observable<any>, index: number) => inner$.pipe(
finalize(() => register.delete(index)),
map(payload => {
register.set(index, payload);
return Array.from(register.values());
})
))
)
})
}
您可以随心所欲地使用它,但这是一种简单(如果有限)的方法。
class AssentService {
private _registered$ = new Subject<Observable<boolean>>();
all(): Observable<boolean[]> {
return this._registered$.pipe(
combineLatestAll()
)
}
register(input: Observable<boolean>) {
this._registered$.next(input);
}
}
为什么会受到限制?这种方法多播register,但您只能在all() 之后的呼叫中使用combineLatestAll()。因此,如果消费者首先调用all(),然后再注册,这将按照我假设您期望的方式工作。
您可以改为缓存最新结果并在所有订阅者之间共享。在这种情况下,init() 是在您的服务启动时运行的任何代码块。 (在您第一次致电all 或register 之前)
class AssentService {
private _registered$ = new Subject<Observable<boolean>>();
private _cashed$ = _registered$.pipe(
combineLatestAll(),
shareReplay(1)
);
/*** init(){ ***/
this._cashed$.subscibe();
/*** } ***/
all(): Observable<boolean[]> {
return this.cashed$;
}
register(input: Observable<boolean>) {
this._registered$.next(input);
}
}