【问题标题】:How to write below code using MergeMap or FlatMap or some better way with rxJs-operators?如何使用 MergeMap 或 FlatMap 或使用 rxJs-operators 的更好方法编写以下代码?
【发布时间】:2020-12-31 04:31:36
【问题描述】:

我有两个可观察的管道。我需要一个接一个地运行并比较两个值是否相等。我尝试了下面的代码。这应该可以工作,当第一个 observable 值发出时,它应该去获取第二个 obserbla 值并且应该首先返回值。我需要一些专家的帮助,以更好地重构这段代码。

   this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub)).subscribe(
          (res: UnitDetail) =>{
              if(res.unitTwo){
                this.appStore.select(selectUnit).
                pipe(shareReplayUntil(this.destroySub)).subscribe(
                  (unitId: string) => {
                    if(unitId ===  res.unitTwo){
                      this.sameUnit = true;
                    }else{
                      this.sameUnit = false;
                    }
                  });
              }
          }
       );

【问题讨论】:

  • 试试this.selectedUnitDetailModel$.pipe(withLatestFrom(this.appStore.select(selectUnit)), map(([{unitTwo}, unitId]) => unitTwo === unitId), tap(console.table), shareReplayUntil(this.destroySub)).subscribe(sameUnit => {this.sameUnit = sameUnit;})
  • 什么是shareReplayUntil
  • @RafiHenig 我创建了主题并将所有管道绑定到它,当组件销毁时,我将取消订阅该主题。实际上它曾经取消订阅 observable

标签: angular typescript rxjs ngrx rxjs-pipeable-operators


【解决方案1】:

您不需要高阶运算符,因为可观察对象 this.selectedUnitDetailModel$this.appStore.select(selectUnit) 是相互独立的。相反,您可以使用 forkJoincombineLatestzip 之类的函数来并行获取它们的通知。

您可以在这些函数here 中找到不同之处。

试试下面的

forkJoin(
  this.selectedUnitDetailModel$.pipe(take(1)),      // <-- complete on first emission
  this.appStore.select(selectUnit).pipe(take(1))    // <-- complete on first emission
).subscribe(
  ([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
  (error) => console.log(error)                     // <-- handle error
);

forkJoin 仅在源可观察对象完成时发出,因此我已将 take(1) 输入到每个可观察对象。 forkJoin 现在将在每个可观察和完整的第一次发射时发射。因此,您对shareReplayUntil(this.destroySub) 的需求就减少了。

但是,如果您需要保持可观察对象的发射流打开,您可以改用 combineLatestzip。在这种情况下,您可以将 take(1) 替换为您的 ``shareReplayUntil(this.destroySub)`。

更新: this.selectedUnitDetailModel$ observable 的连续流

就像我之前说的,您可以使用combineLatest 而不是forkJoin 来启用连续的数据流。

试试下面的

import { Subject, combineLatest } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

combineLatest(
  this.selectedUnitDetailModel$,
  this.appStore.select(selectUnit)
).pipe(
  takeUntil(this.destroySub)         // <-- replaced with `takeUntil` operator
).subscribe(
  ([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
  (error) => console.log(error)                     // <-- handle error
);

【讨论】:

  • 谢谢,我会试试这个。需要知道一件事,'this.selectedUnitDetailModel$' 这个管道值是变化的,然后它在第二次工作? (想一想,this.selectedUnitDetailModel$ 是 List -> 我会一一选择,然后需要重新计算单元是否相等)
  • @uma:不会。就像我之前所说的,对于连续的数据流,您可以使用 combineLatest 代替。我已经更新了答案。请检查更新后的代码是否适合您。
  • Michael ,我尝试了第二个 code.combineLatest -> 给出了一些已弃用的声纳问题.. 和行 ([res:UnitDetail , unitID: string ] ,给出编译错误的地方,例如 undefnde :(
  • @uma:我不确定这个shareReplayUntil 是如何实现的。尝试将其替换为 takeUntil 运算符。它执行您需要的相同操作:在组件被销毁时关闭打开的订阅。我也更新了答案。
  • @uma: 另外请从数组([res, unitId]) 中删除类型定义。我已经更新了答案。
【解决方案2】:
this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub),mergeMap(
          (res: UnitDetail) =>{
              if(res.unitTwo){
               return this.appStore.select(selectUnit).
                pipe(shareReplayUntil(this.destroySub),map(
                  (unitId: string) =>  unitId ===  res.unitTwo);
              }
          }
       ).subscribe({
        next: (sameUnit: boolean) => {
           //do something 
        }
       });

【讨论】:

  • 你可以在mergeMap中添加return of(false)来处理res.unitTwo为false的场景。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2022-01-24
  • 2019-12-10
  • 2018-09-16
  • 1970-01-01
  • 2015-06-23
  • 2018-06-21
  • 2013-07-19
相关资源
最近更新 更多