【问题标题】:Combine initial Observable with an Observable for changes (into a single Observable)将初始 Observable 与 Observable 组合以进行更改(合并为单个 Observable)
【发布时间】:2019-12-11 08:40:20
【问题描述】:

我有两个 observables。一个获取初始数据,另一个应该对更改做出反应并相应地应用它们:

const initial$: Observable<Report[]>;
const changes$: Observable<ObjectChangeEvent<Report>>;

一些特点是:

  1. initial$ 必须先完成,然后才能应用更改
  2. changes$ 可以发出 0..n 次
  3. 更改后的数组应该是下一次更改发出的基础。这意味着只有第一个更改应该应用于初始状态。以下更改不应丢弃以前的更改。

我想将两个可观察对象合并为一个。到目前为止,我最接近的是 combineLatest 运算符。但它与特征 2) 冲突,因为 changes$ 可能不会发出任何东西。

任何帮助将不胜感激。

【问题讨论】:

    标签: rxjs observable reactive-programming


    【解决方案1】:

    你的意思是这样的:

    this.result$ = this.changes$.pipe(
      skipUntil(this.initial$),
      // ...
    );
    

    或者……

    const initial$: Observable<Report[]>;
    const changes$: Observable<ObjectChangeEvent<Report>>;
    
    const accumulatedChanges$: Observable<ObjectChangeEvent<Report>[]>;
    
    this.accumulatedChanges$ = this.changes$.pipe(
      scan((acc, curr) => [...acc, curr], []),
      startWith([]),
    ); // emits [change1], [change1, change2], [change1, change2, change3]....
    
    this.result$ = combineLatest([this.initial$, this.accumulatedChanges$]).pipe(
      // apply all accumulated changes on initial
    );
    

    编辑:

    或者....

    this.result$ = this.initial$.pipe(
      switchMap(initial => this.changes$.pipe(
        // applyChangeToCurrent is only a placeholder for your semantic to alter the current
        scan((current, change) => applyChangeToCurrent(current, change), initial),
      )),
    );
    

    【讨论】:

    • combinLatest 真的适合这里吗?它要求每个 observable 至少发出一次,而“changes$”没有给出。 “扫描”似乎是个好主意,但我需要以某种方式访问​​数组。我想用前一个数组和一个 ObjectChangeEvent 构建一个新数组。例如,如果一个对象被删除,则新数组应该少一项。
    • 您可以在扫描后添加 startWith 以强制发射 1 次。我在示例中添加了它
    • 如果两个 observables 都至少发射一次,则第三种方法有效。所以我不得不将“startsWith”添加到“changes$”中。我决定使用“concat(initial$, changes$).pipe(scan(...));”你给了我正确的方向,非常感谢!
    【解决方案2】:

    为了满足第一个要求,您可以使用last 运算符,结合mergeMap 切换到changes$ 可观察对象并在其中使用initial$ 的最后一个值。 使用startWith 运算符将其用作初始状态,以防changes$ 为空。

    一般应该是这样的:

    initial$.pipe(
      last(),
      mergeMap(initialState =>
        changes$.pipe(startWith(initialState))
      )
    );
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-10-29
      • 2018-12-14
      • 1970-01-01
      • 2019-03-05
      • 1970-01-01
      • 2022-06-22
      相关资源
      最近更新 更多