【问题标题】:RxJS observable does not trigger subscribe after mergeMapRxJS observable 在 mergeMap 后不会触发订阅
【发布时间】:2022-07-11 22:22:53
【问题描述】:

我正在尝试使用 RxJS 运算符对我的数据集合进行分组,并在我的 Angular 应用程序中将其拆分为多个流,但似乎无法使其正常工作。在我的SignalRService 中,我在构造函数中设置了一个 SignalR 触发器,这样当从服务器传递数据时,它会将其传递给我创建的主题。

export class SignalRService {
  private connection?: signalR.HubConnection;
  private orders = new Subject<OrderModel[]>();
  orders$ = this.orders.asObservable();
    
  constructor() {
    // ... SignalR connection functions ...
    
    this.connection?.on('GetOrders', (data: OrderModel[]) => {
      this.orders.next(data);
    });
  }
};

OrderService 中,我使用一些管道运算符订阅orders$ 主题,因为我想根据Order 对象的status 将数据拆分为3 个不同的流。

我将地图展平,使用 groupBy,然后使用键和相应的数据再次合并,但是,由于某种原因,这不起作用,我不确定应该看哪里。 当我在当前运算符之间使用tap 运算符时,它只记录前两次点击。它似乎永远不会到达第三个,因此永远不会执行我想的订阅。 此外,当SignalRService 中的this.orders.next(data) 被执行两次或更多次时,什么也不会发生。

export class OrderService {
  // Observable sources
  private orderCategory0 = new BehaviorSubject<OrderModel[]>([]);
  private orderCategory1 = new BehaviorSubject<OrderModel[]>([]);
  private orderCategory2 = new BehaviorSubject<OrderModel[]>([]);
  private orders = [this.orderCategory0, this.orderCategory1, this.orderCategory2];
  // Observable streams
  orderCategory0$ = this.orderCategory0.asObservable();
  orderCategory1$ = this.orderCategory1.asObservable();
  orderCategory2$ = this.orderCategory2.asObservable();
  
  constructor(private signalRService: SignalRService) {
    signalRService.orders$
      .pipe(
        mergeMap((res) => res),
        //tap((res) => console.log(res)), <-- This one shows
        groupBy((order: OrderModel) => order.status),
        //tap((res) => console.log(res)), <-- This one shows
        mergeMap((group) => zip(of(group.key), group.pipe(toArray())))
        //tap((res) => console.log(res)), <-- This one doesn't
      )
      .subscribe(([groupId, data]) => this.orders[groupId].next(data));
  }
};

请注意,当我在 OrderService 中执行以下操作时,一切都会按预期进行:

signalRService.orders$.subscribe((data: OrderModel[]) => {
  const groups = this.groupData(data);

  this.orderCategory0.next(groups[0]);
  this.orderCategory1.next(groups[1]);
  this.orderCategory2.next(groups[2]);
});

目前,我迷路了,也许我这样做完全错了,所以任何指针都将不胜感激。

编辑:另外,当我对订单进行硬编码并使用of(orders).pipe(...).subscribe(...),从而省略signalRService.order$ 部分时,一切正常。

【问题讨论】:

    标签: angular typescript rxjs signalr


    【解决方案1】:

    问题在于 group.pipe(toArray()) 永远不会发出,因为 group 是一个保持打开状态的可观察对象(大概直到 order$ 完成),并且 toArray 等待发射,直到 observable 完成。话虽如此,groupBy 的使用一开始可能有点过头了。

    如果我正确解释了您的示例,那么您创建组只是为了将它们传递给正确的行为主题,这些主题位于由状态键入的集合中。收到个人订单后,您只需将其发送给正确的接收者 - 无需额外的转换。

    signalRService.orders$.pipe(
      mergeMap((res) => res),
    ).subscribe((order: OrderModel) => this.orders[order.status].next(order));
    

    如果你真的需要 groupBy(也许这是一个更大问题的简化),那么不要使用 zip 并且有取而代之的是内管。

    signalRService.orders$.pipe(
      mergeMap((res) => res),
      groupBy((order: OrderModel) => order.status),
      mergeMap((grp) => grp.pipe(order => tap(this.orders[grp.key].next(order)))
    ).subscribe();
    

    最后,您可以通过拥有一个来源并按状态过滤来避免同时使用主题。

    const readonly orders$ = signalRService.orders$.pipe(
      mergeMap((res) => res),
      shareReplay(1)
    );
    const readonly orderCategory0$ = this.orders$.pipe(filter(x => x.status === 'status0'));
    const readonly orderCategory1$ = this.orders$.pipe(filter(x => x.status === 'status1'));
    const readonly orderCategory2$ = this.orders$.pipe(filter(x => x.status === 'status2'));
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-12
      • 2021-12-22
      • 1970-01-01
      • 1970-01-01
      • 2020-03-28
      • 1970-01-01
      • 2018-08-03
      • 1970-01-01
      相关资源
      最近更新 更多