【问题标题】:RxJs - can't create two new Observables from merged oneRxJs - 不能从合并的 Observable 中创建两个新的 Observable
【发布时间】:2021-09-26 19:40:03
【问题描述】:

我现在被困在这里,希望有人可以帮助我:-)

以下场景:

我有两个传输相同数据类型的流。 第一个流提供初始数据(REST 调用)并在之后完成。 第二个流通过 websocket 获取它的数据并且没有完成。

const mockData: User[] = [
  { id: '1', status: 'active' },
  { id: '2', status: 'inactive' },
  { id: '3', status: 'active' }
];

const initial$: Observable<User[]> = of(mockData);

const sub$: BehaviorSubject<User[]> = new BehaviorSubject(mockData);
const dynamic$ = sub$.asObservable();

现在我想合并两个流并根据过滤条件创建两个新流,一个用于活动用户,一个用于停用用户。

const merged$ = merge(initial$, dynamic$).pipe(
  distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b))
);

const grouped$ = merged$.pipe(
  mergeMap((users: User[]) => from(users)),
  tap(a => console.log('Before group by', a)),
  groupBy(item => item.status),
  tap(a => console.log('After group by', a.key)),
  mergeMap(group => zip(of(group.key), group.pipe(toArray()))),
  tap(a => console.log('after merge map', a))
);

const activeByGrouped$ = grouped$.pipe(
  filter(([key]) => key === 'active'),
  map(([key, value]) => value)
);

activeByGrouped$.subscribe(a => console.log('final subscribe', a));

如果两个流都完成,整个事情就可以工作,但如果一个是打开的,则不行。 附上一个stackblitz以便更好地理解。

>> stackblitz https://stackblitz.com/edit/typescript-ixc3nu

【问题讨论】:

    标签: javascript merge rxjs behaviorsubject rxjs-observables


    【解决方案1】:

    有时你只见树木不见森林 - 这比预期的要容易。

    interface User {
      id: string;
      status: 'active' | 'inactive';
    }
    
    const mockData: User[] = [
      { id: '1', status: 'active' },
      { id: '2', status: 'inactive' },
      { id: '3', status: 'active' }
    ];
    
    const initial$: Observable<User[]> = of(mockData);
    
    const sub$: BehaviorSubject<User[]> = new BehaviorSubject(mockData);
    const dynamic$ = sub$.asObservable();
    
    setTimeout(() => sub$.next([{ id: '1', status: 'active' }]), 2000);
    
    const merged$ = merge(initial$, dynamic$).pipe(
      distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)),
      tap(console.log),
      shareReplay({ bufferSize: 1, refCount: true }),
    );
    
    const activeGrouped$ = merged$.pipe(
      map((users: User[]) => users.filter((user: User) => user.status === 'active'))
    );
    
    const inactiveGrouped$ = merged$.pipe(
      map((users: User[]) =>
        users.filter((user: User) => user.status === 'inactive')
      )
    );
    
    activeGrouped$.subscribe(a => console.log('final subscribe - active:', a));
    inactiveGrouped$.subscribe(a => console.log('final subscribe - inactive:', a));

    工作演示:stackblitz

    【讨论】:

      【解决方案2】:

      我对您想要实现的目标的理解是,您希望拥有一个只有活跃/非活跃用户的流。在这种情况下,toArray() 不是一个好的选择,因为它需要源 Observable 来完成,这在您使用 Subjects 时永远不会发生(除非您自己调用 complete())。

      所以看来你可以重组你的链条。但是,您不能收集所有结果然后发出一个大数组(因为您的源再次没有完成)。您也可以使用 scan() 发出所有中间结果,但这取决于您想要什么行为:

      const grouped$ = merged$.pipe(
        mergeMap((users: User[]) => from(users)),
        groupBy(item => item.status),
      );
      
      const activeByGrouped$ = grouped$.pipe(
        filter(group => group.key === 'active'),
        mergeMap(group => group)
      );
      

      您更新的演示:https://stackblitz.com/edit/typescript-evvehx

      【讨论】:

      • 感谢您的回答,我需要每个活动/非活动可观察对象的列表,因此 zip 运算符是朝着正确方向迈出的一步。但是,这会累积值,但我需要另一个功能,如下所述。 -> 我总是从后端得到一个完整的有效值列表,所以总是一个对象列表。一旦后端发送新列表,之前传输的值将从那时起无效。
      • 有时只见树木不见森林 - 这比预期的要容易工作演示:stackblitz
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-02-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-07-15
      • 2017-09-02
      • 2016-12-11
      相关资源
      最近更新 更多