【发布时间】:2022-07-11 22:22:53
【问题描述】:
我正在尝试使用 RxJS 运算符对我的数据集合进行分组,并在我的 Angular 应用程序中将其拆分为多个流,但似乎无法使其正常工作。在我的SignalRService 中,我在构造函数中设置了一个 SignalR 触发器,这样当从服务器传递数据时,它会将其传递给我创建的主题。
export class SignalRService {
private connection?: signalR.HubConnection;
private orders = new Subject<OrderModel[]>();
orders$ = this.orders.asObservable();
constructor() {
// ... SignalR connection functions ...
this.connection?.on('GetOrders', (data: OrderModel[]) => {
this.orders.next(data);
});
}
};
在OrderService 中,我使用一些管道运算符订阅orders$ 主题,因为我想根据Order 对象的status 将数据拆分为3 个不同的流。
我将地图展平,使用 groupBy,然后使用键和相应的数据再次合并,但是,由于某种原因,这不起作用,我不确定应该看哪里。
当我在当前运算符之间使用tap 运算符时,它只记录前两次点击。它似乎永远不会到达第三个,因此永远不会执行我想的订阅。
此外,当SignalRService 中的this.orders.next(data) 被执行两次或更多次时,什么也不会发生。
export class OrderService {
// Observable sources
private orderCategory0 = new BehaviorSubject<OrderModel[]>([]);
private orderCategory1 = new BehaviorSubject<OrderModel[]>([]);
private orderCategory2 = new BehaviorSubject<OrderModel[]>([]);
private orders = [this.orderCategory0, this.orderCategory1, this.orderCategory2];
// Observable streams
orderCategory0$ = this.orderCategory0.asObservable();
orderCategory1$ = this.orderCategory1.asObservable();
orderCategory2$ = this.orderCategory2.asObservable();
constructor(private signalRService: SignalRService) {
signalRService.orders$
.pipe(
mergeMap((res) => res),
//tap((res) => console.log(res)), <-- This one shows
groupBy((order: OrderModel) => order.status),
//tap((res) => console.log(res)), <-- This one shows
mergeMap((group) => zip(of(group.key), group.pipe(toArray())))
//tap((res) => console.log(res)), <-- This one doesn't
)
.subscribe(([groupId, data]) => this.orders[groupId].next(data));
}
};
请注意,当我在 OrderService 中执行以下操作时,一切都会按预期进行:
signalRService.orders$.subscribe((data: OrderModel[]) => {
const groups = this.groupData(data);
this.orderCategory0.next(groups[0]);
this.orderCategory1.next(groups[1]);
this.orderCategory2.next(groups[2]);
});
目前,我迷路了,也许我这样做完全错了,所以任何指针都将不胜感激。
编辑:另外,当我对订单进行硬编码并使用of(orders).pipe(...).subscribe(...),从而省略signalRService.order$ 部分时,一切正常。
【问题讨论】:
标签: angular typescript rxjs signalr