【问题标题】:RxJS group by field and return new observableRxJS 按字段分组并返回新的 observable
【发布时间】:2021-06-12 11:12:14
【问题描述】:

我已经关注了接口和Observable<Machine[]>,我想要实现的是按Observable<Machine[]> 中的机器symbol 属性分组并返回映射的可观察Observable<Order[]>

export interface Machine {
    symbol: string;
    price: number;
    amount: number;
    id: number;
}

export interface Order {
    symbol: string;
    machines: OrderMachine[];
}

export interface OrderMachine {
    price: number;
    amount: number;
    id: number;
}

我尝试使用 RxJS 的 gropBy 运算符,但它似乎一个一个地返回分组数组。

machines: Machine[] = [
        { amount: 1,  id: 1, symbol: "A", price: 1 },
        { amount: 1,  id: 2, symbol: "A", price: 2 }
    ];


of(machines).pipe(
        takeUntil(this.unsubscribe),
        mergeMap(res => res),
        groupBy(m => m.symbol),
        mergeMap(group => zip(of(group.key), group.pipe(toArray()))),
        map(x => { // here I have probably wrong model [string, Machine[]]
            const orderMachines = x[1].map(y => { return <OrderMachine>{price: y.price, amount: y.amount, id: y.id }})
            return <Order>{ symbol: x[0], machines: orderMachines }  })
        );

结果我有Observable&lt;Order&gt;而不是Observable&lt;Order[]&gt;

预期结果模型:

orders: Order[] = [
        {   
            symbol: "A", 
            machines: [
                { amount: 1, price: 1, id: 1 },
                { amount: 1, price: 2, id: 2 }
            ]
        }
    ];

【问题讨论】:

    标签: rxjs rxjs5 rxjs6 rxjs-observables


    【解决方案1】:

    这里有一个基于您的方法的可能解决方案,但有一些变化:

    const machines = [
      { amount: 1, id: 1, symbol: "A", price: 1 },
      { amount: 1, id: 2, symbol: "A", price: 2 },
      { amount: 1, id: 3, symbol: "B", price: 3 }
    ];
    
    from(machines) // (1)
      .pipe(
        // (2)
        groupBy((m) => m.symbol),
        mergeMap((group) => group.pipe(toArray())),
        map((arr) => ({
          symbol: arr[0].symbol, // every group has at least one element
          machines: arr.map(({ price, amount, id }) => ({
            price,
            amount,
            id
          }))
        })),
        toArray(), // (3)
      )
      .subscribe(console.log);
    

    (1) 我将of(machines) 更改为from(machines),以便将machines 中的对象一一发送到流中。在此更改之前,整个数组立即发出,因此流被破坏了。

    (2) 我从管道中删除了takeUntil(this.unsubscribe)mergeMap(res =&gt; res),因为没有理由在您的示例中使用它们。 takeUntil 不会产生任何影响,因为流是有限且同步的。与 mergeMap 一起应用的标识函数 (res =&gt; res) 在流中有意义,但在您的示例中并非如此。或者您的项目是否真的需要这些运算符,因为您有无限的 observable 流?

    (3) toArray()Observable&lt;Order&gt; 转换为Observable&lt;Order[]&gt;。它一直等到流结束,并以数组的形式立即发出所有流式传输的值。

    编辑:

    操作员提到他宁愿需要一个与无限流兼容的解决方案,但因为toArray 仅适用于有限流,所以上面提供的答案在这种情况下永远不会发出任何东西。

    为了解决这个问题,我会避免使用来自 rxjs 的groupBy。在您需要将一个流拆分为多组流的其他情况下,它是一个非常强大的工具,但在您的情况下,您只是想对一个数组进行分组,并且有更简单的方法。

    this.store.pipe(
        select(fromOrder.getMachines)
        map((arr) =>
            // (*) group by symbol
            arr.reduce((acc, { symbol, price, amount, id }) => {
                acc[symbol] = {
                    symbol,
                    machines: (acc[symbol] ? acc[symbol].machines : [])
                        .concat({ price, amount, id })
                };
                return acc;
            }, {})
        ),
    )
    .subscribe((result) => 
        // (**)
        console.log(Object.values(result))
    );
    

    (*)你可以使用一个普通的groupBy implementation,它返回一个形状为{[symbol: string]: Order}的对象。

    (**) result 在这里是一个对象,但您可以轻松地将其转换为数组,但应用 Object.values(result)

    【讨论】:

      【解决方案2】:

      @kruschid 非常感谢您的回复,它可以正常工作,但不幸的是,当我想将它与我的商店 (ngrx) 一起使用时它不起作用,输入正常但在mergeMap 之后停止显示日志方法:

         this.store.pipe(select(fromOrder.getMachines),
                           mergeMap(res => res), // Machine[]
                           groupBy((m) => m.symbol),
                           tap(x => console.log(x)), //this shows object GroupedObservable {_isScalar: false, key: "A", groupSubject: Subject, refCountSubscription: GroupBySubscriber} 
                           mergeMap((group) => group.pipe(toArray())),
                           tap(x => console.log(x)), // this is not printed in console
                           map((arr) => <Order>({
                              symbol: arr[0].symbol,
                              machines: arr.map(({ price, amount, id }) => ({
                                  price,
                                  amount,
                                  id
                              }))
                              })),
                           toArray())) // (3)
      

      【讨论】:

      • 我以前从未使用过ngrx,但我的猜测是:mergeMap(res =&gt; res) 中断了流。我必须查找select 的签名,发现在您的特定情况下返回类型将是Obsevable&lt;Machine[]&gt;,对吗?这意味着您可以删除mergeMap(res =&gt; res)。只有当select 返回Observable&lt;Observable&lt;Machine[]&gt;&gt; 时,此时使用mergeMap 才有意义。只有这样,通常才需要具有标识函数x =&gt; x(或更短的mergeAll)的mergeMap:扁平化流(Observable&lt;Observable&lt;T&gt;&gt; -&gt; Observable&lt;T&gt;)。
      • 是的,你是对的select 返回Observable&lt;Machine[]&gt;,但是如果我删除mergeMap(res =&gt; res) 那么groupBy m 参数是Machine[] 而不是Machine
      • 你是对的。我不知道mergeMap 也可用于展平数组。好点子。您的代码没有按预期工作,因为this.store 很可能是一个无限流,但toArray()(在mergeMap 内部)等待永远不会到达的complete 信号。尝试在第一个 mergeMap 之前添加 take(1) 以在选择一个最新状态样本后发送 complete 信号。
      • 恐怕我不能使用take(1),因为它必须使用无限流,并且在此更改后它只返回一次数据(在我的情况下是空数组)并停止“监听”改变了,还有其他选择吗?
      猜你喜欢
      • 2017-12-28
      • 1970-01-01
      • 1970-01-01
      • 2012-07-01
      • 2019-08-05
      • 1970-01-01
      • 2016-12-18
      • 1970-01-01
      • 2019-10-11
      相关资源
      最近更新 更多