【问题标题】:combineLatest alternative to combine or merge ObservablescombineLatest 组合或合并 Observables 的替代方法
【发布时间】:2018-07-02 15:21:23
【问题描述】:

一开始我的印象是 combineLast 很合适,但当我阅读文档时,它似乎不是:“请注意,combineLatest 不会发出初始值,直到每个 observable 至少发出一个值。”……当然,我恰好遇到了那个例外。我尝试了forkJoinmerge 的各种组合,但我做不到。

用例非常简单,someObs 方法返回0 或更多可观察对象,我在其上循环。基于SomeObs 对象上的值,我将一个新构造的可观察对象OtherObs[] 推送到ArrayObservable<OtherObs[]>。这个数组“需要”被合并到一个单独的 observable 中,在返回它之前我想做一些转换。

具体来说,我很难用合适的东西替换 combineLast 运算符……

public obs(start: string, end: string): Observable<Array<OtherObs>> {
  return this.someObs().pipe(
    mergeMap((someObs: SomeObs[]) => {
      let othObs: Array<Observable<OtherObs[]>> = [];

      someObs.forEach((sobs: SomeObs) => {
        othObs.push(this.yetAnotherObs(sobs));
      });

      return combineLatest<Event[]>(othObs).pipe(
        map(arr => arr.reduce((acc, cur) => acc.concat(cur)))
      );
    })
  );
}

private yetAnotherObs(): Observable<OtherObs[]> {
  /// ...
}

private somObs(): Observable<SomeObs[]> {
  /// ...
}

【问题讨论】:

  • 那么你的问题是什么,你想要做什么或者什么不起作用?
  • 您通常可以通过将startWith 应用于每个内部可观察对象来解决 combineLatest() 限制,以便它们始终立即发出一些初始值。
  • 对我来说,问题出在你的方法 yetAnotherObs() 上。这个函数返回的可观察对象似乎没有发出任何值。你可以使用任何组合运算符,但你必须确保你的 observables 发出一些东西来继续。如果能详细了解这个函数的作用,那就太好了。

标签: merge rxjs observable


【解决方案1】:

combineLatest 的“问题”是(就像你说的那样)它“在每个 observable 发出至少一个值之前不会发出初始值”。但这不是问题,因为您可以使用 RxJS 运算符startWith

所以你的 observable 获得了一个初始值,combineLatest 就像一个魅力;)

import { of, combineLatest } from 'rxjs';
import { delay, map, startWith } from 'rxjs/operators';

// Delay to show that this observable needs some time
const delayedObservable$ = of([10, 9, 8]).pipe(delay(5000));

// First Observable
const observable1$ = of([1, 2, 3]);

// Second observable that starts with empty array if no data
const observable2$ = delayedObservable$.pipe(startWith([]));

// Combine observable1$ and observable2$
combineLatest(observable1$, observable2$)
  .pipe(
    map(([one, two]) => {
      // Because we start with an empty array, we already get data from the beginning
      // After 5 seconds we also get data from observable2$
      console.log('observable1$', one);
      console.log('observable2$', two);

      // Concat only to show that we map boths arrays to one array
      return one.concat(two);
    })
  )
  .subscribe(data => {
    // After 0 seconds: [ 1, 2, 3 ]
    // After 5 seconds: [ 1, 2, 3, 10, 9, 8 ]
    console.log(data);
  });

See example on Stackblitz

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-03-13
    • 1970-01-01
    • 1970-01-01
    • 2013-03-01
    相关资源
    最近更新 更多