【发布时间】:2021-09-26 19:40:03
【问题描述】:
我现在被困在这里,希望有人可以帮助我:-)
以下场景:
我有两个传输相同数据类型的流。 第一个流提供初始数据(REST 调用)并在之后完成。 第二个流通过 websocket 获取它的数据并且没有完成。
const mockData: User[] = [
{ id: '1', status: 'active' },
{ id: '2', status: 'inactive' },
{ id: '3', status: 'active' }
];
const initial$: Observable<User[]> = of(mockData);
const sub$: BehaviorSubject<User[]> = new BehaviorSubject(mockData);
const dynamic$ = sub$.asObservable();
现在我想合并两个流并根据过滤条件创建两个新流,一个用于活动用户,一个用于停用用户。
const merged$ = merge(initial$, dynamic$).pipe(
distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b))
);
const grouped$ = merged$.pipe(
mergeMap((users: User[]) => from(users)),
tap(a => console.log('Before group by', a)),
groupBy(item => item.status),
tap(a => console.log('After group by', a.key)),
mergeMap(group => zip(of(group.key), group.pipe(toArray()))),
tap(a => console.log('after merge map', a))
);
const activeByGrouped$ = grouped$.pipe(
filter(([key]) => key === 'active'),
map(([key, value]) => value)
);
activeByGrouped$.subscribe(a => console.log('final subscribe', a));
如果两个流都完成,整个事情就可以工作,但如果一个是打开的,则不行。 附上一个stackblitz以便更好地理解。
>> stackblitz https://stackblitz.com/edit/typescript-ixc3nu
【问题讨论】:
标签: javascript merge rxjs behaviorsubject rxjs-observables