【问题标题】:RxJS: match two observables using custom logicRxJS:使用自定义逻辑匹配两个可观察对象
【发布时间】:2019-12-16 15:47:12
【问题描述】:

我有两个 observable 流,每个流在无限的时间段内发出一系列项目(类似于基于 DOM 的 click Observable 的行为方式)。我知道来自 Observable A (a$) 的项目将匹配来自 Observable B (b$) 的项目,但需要执行一些自定义逻辑来确定哪些项目匹配。 我试图完成这项工作,但我只能让第一对匹配,然后后续项目再也不会发出......

这是代码的摘录:

a$.pipe(
  mergeMap(a => {
    return b$.pipe(
      filter(b => b.key.includes(a.subKey)), // custom matching logic goes here
      take(1),
      map(b => ({ a, b }))
    );
  })
)
.subscribe(({ a, b }) => {
  console.log("do something with a and b", a, b);
});

请注意,both Observables 永远不会完成,因此如果来自a$ 的某些项目a 发出,则它的“对”可能尚未从b 发出。这就是为什么我在上面使用filter 而不是find。当我找到匹配的项目时,我可以完成内部 observable,因为该对已经匹配和处理。

请指教,我错过了什么?

【问题讨论】:

    标签: rxjs


    【解决方案1】:

    你最近看过 combine 吗?一旦两个流都发出一次,它就会发出两个流的最新值。

    combineLatest(a$, b$).pipe(filter(([a, b]) => b.key.includes(a.subKey)))
      .subscribe(([a, b]) => {
        // Do stuff with an and b here
      });
    

    【讨论】:

    • 如果a$b$ 再次发出会发生什么? filter 的谓词将为每个可能的对运行一次(所以如果 a$ 发出 3 个值,b$ 发出 2 个值,总共 6 个调用)?似乎很浪费,我正在对我已经处理过的as 进行不必要的检查。我还应该注意我尝试了combineLatest,但在mergeMap 内,所以这可能有效。我仍然认为它不是很有效。考虑到我可以有很多发射,因为它们都是无限流。
    【解决方案2】:

    我认为解决此问题的一种方法是首先累积地图中每个可观察对象的值,然后使用 combineLatest 运算符,以便您可以在每次发射。

    const aMap$ = a$.pipe(scan((acc, crt) => (acc[crt.id] = crt, acc), Object.create(null)));
    const bMap$ = b$.pipe(scan((acc, crt) => (acc[crt.id] = crt, acc), Object.create(null)));
    
    combineLatest(aMap$, bMap$)
        .pipe(
            map(([aMap, bMap]) => {
                let pair = null;
    
                for (const bKey in bMap) {
                    const bVal = bMap[bKey];
    
                    const aPairKey = bVal.keys.find(k => !!aMap[k]);
    
                    if (aPairKey) {
                        pair = { a: aMap[aPairKey], b: bVal };
    
                        delete aMap[aPairKey];
                        delete bMap[bKey];
    
                        break;
                    }
                }
    
                return pair;
            }),
            filter(v => !!v)
        )
    

    【讨论】:

      【解决方案3】:

      我会累积来自 A 和 B 的值,以查看哪些值已经发生。在您可以从这些数组创建交集之后

      const keysOccuredInA$ = a$.pipe(
          map(a => a.subKey),
          scan((acc, curr) => ([...acc, curr]), []),
      );
      
      const keysOccuredInB$ = b$.pipe(
          map(b => b.key), // b.key is an array, right?
          scan((acc, curr) => ([...acc, ...curr]), []),
      );
      
      keysOccuredInBoth$ = combineLatest(keysOccuredInA$, keysOccuredInB$).pipe(
          map(([keysOccuredInA, keysOccuredInB]) => _intersection(keysOccuredInA, keysOccuredInB)), // lodash intersection
      )
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2019-03-10
        • 2020-09-22
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-06-18
        • 1970-01-01
        • 2022-11-25
        相关资源
        最近更新 更多