【问题标题】:Merge incremental data into observable将增量数据合并到 observable
【发布时间】:2022-01-20 16:29:47
【问题描述】:

我有一个后端方法,它只获取时间间隔(从,到)之间的更改数据。 这些增量更改应合并到当前数据中。 但我不知道如何将增量更改合并到当前活动数据中。

// will be triggered all N-seconds.
private refresh$ = new BehaviorSubject<number>(0);
private last = 0;

data$: Observable<Record[]> = refresh$.pipe(
  switchMap((elapsed: number) => this.fetchIncrementalData(this.last, elapsed),
  // here mergeWithIncrementalChanges() should be called, but how to get the current active data as the first function parameter, which is the latest value of data$??
)

fetchIncrementalData(last: number, elapsed: number): Record[] {
    // calls the webservice which returns only the records (and only the changed properties of the record), which have changed in the specified time interval
    return service.getIncrementalChanges(last, number);
}

mergeWithIncrementalChanges(currentData: Record[], updateData: Record[]): Record[] {
    // merge the new incremental data from fetchIncrementalData() into the current active data.
    ...
    return mergedRecords;
}

【问题讨论】:

    标签: typescript rxjs rxjs-observables


    【解决方案1】:

    scan 操作符是这里的理想选择。它允许您累积每次发射的变化。这有点像 reduce,但不是返回单个结果,而是每次发射都返回一个结果。

    data$: Observable<Record[]> = refresh$.pipe(
      switchMap((elapsed: number) => this.fetchIncrementalData(this.last, elapsed),
      scan((acc, cur) => mergeWithIncrementalChanges(acc, cur), [] as Record[])
    )
    

    上面的示例在每次刷新时获取增量数据。然后每次调用 scan 都会合并上一次调用的结果(从一个初始的空数组开始),并返回这些结果。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-10-29
      • 2018-06-12
      • 1970-01-01
      • 2020-09-01
      • 1970-01-01
      • 2018-12-14
      • 2020-04-27
      • 1970-01-01
      相关资源
      最近更新 更多