【问题标题】:What are use cases for mergeMap operator?mergeMap 运算符的用例是什么?
【发布时间】:2017-06-26 12:24:36
【问题描述】:

我完全不明白mergeMap 的目的。我听说有两种解释:

  1. 就像 .NET LINQ 中的 SelectAll() - 不。
  2. 这是 RxJS mergemap 的组合 - 不(或者我无法复制)。

考虑following code

    var obs1 = new Rx.Observable.interval(1000);
    var obs2 = new Rx.Observable.interval(1000);
    
    //Just a merge and a map, works fine
    obs1.merge(obs2).map(x=> x+'a').subscribe(
      next => console.log(next)
    )
    
    //Who know what - seems to do the same thing as a plain map on 1 observable
    obs1.mergeMap(val => Rx.Observable.of(val + `B`))
        .subscribe(
          next => console.log(next)
        )

最后一张标有“谁知道是什么”的文章只不过是obs1 上的一张地图——有什么意义?

mergeMap 实际上是做什么的?什么是有效用例的示例? (最好有一些代码)

对我没有帮助的文章(上面的mergeMap代码来自其中之一):1, 2

【问题讨论】:

    标签: javascript rxjs observable


    【解决方案1】:

    tl;dr; mergeMapmap 强大得多。了解mergeMap是充分利用Rx的必要条件。


    相似之处

    • mergeMapmap 都作用于单个流(与 zipcombineLatest 相比)

    • mergeMapmap 都可以转换流的元素(与 filterdelay 相比)

    差异

    地图

    • 不能改变源流的大小(假设:map 本身没有throw);对于来自源的每个元素,恰好发出一个 mapped 元素; map 不能忽略元素(例如filter);

    • 在默认调度程序的情况下,转换同步发生; 100% 明确:源流可能会异步传递其元素,但每个下一个元素会立即mapped 并进一步重新发送; map 无法及时移动元素,例如 delay

    • 对返回值没有限制

    • id:x => x

    合并地图

    • 可以改变源流的大小;对于每个元素,可能会创建/发出任意数量(0、1 或许多)的新元素

    • 它提供了对异步性的完全控制——无论是何时创建/发出新元素,还是应该同时处理源流中的多少元素;例如,假设源流发出 10 个元素,但 maxConcurrency 设置为 2,那么将立即处理两个第一个元素,其余 8 个被缓冲;一旦处理了completed 中的一个,源流中的下一个元素将被处理,依此类推——这有点棘手,但请看下面的示例

    • 所有其他运算符都可以仅使用 mergeMapObservable 构造函数实现

    • 可用于递归异步操作

    • 返回值必须是 Observable 类型(或者 Rx 必须知道如何从中创建 observable - 例如 promise、数组)

    • id:x => Rx.Observable.of(x)

    数组类比

    let array = [1,2,3]
    fn             map                    mergeMap
    x => x*x       [1,4,9]                error /*expects array as return value*/
    x => [x,x*x]   [[1,1],[2,4],[3,9]]    [1,1,2,4,3,9]
    

    这个类比没有显示完整的画面,它基本上对应于.mergeMapmaxConcurrency 设置为 1。在这种情况下,元素将按上述顺序排列,但一般情况下不必如此。我们唯一的保证是新元素的发射将按照它们在底层流中的位置进行排序。例如:[3,1,2,4,9,1][2,3,1,1,9,4] 有效,但 [1,1,4,2,3,9] 无效(因为 4 在基础流中的 2 之后发出)。

    几个使用mergeMap的例子:

    // implement .map with .mergeMap
    Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
      return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
    }
    
    Rx.Observable.range(1, 3)
      .mapWithMergeMap(x => x * x)
      .subscribe(x => console.log('mapWithMergeMap', x))
    
    // implement .filter with .mergeMap
    Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
      return this.mergeMap(x =>
        filterFn(x) ?
        Rx.Observable.of(x) :
        Rx.Observable.empty()); // return no element
    }
    
    Rx.Observable.range(1, 3)
      .filterWithMergeMap(x => x === 3)
      .subscribe(x => console.log('filterWithMergeMap', x))
    
    // implement .delay with .mergeMap 
    Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
      return this.mergeMap(x =>
        Rx.Observable.create(obs => {
          // setTimeout is naive - one should use scheduler instead
          const token = setTimeout(() => {
            obs.next(x);
            obs.complete();
          }, delayMs)
          return () => clearTimeout(token);
        }))
    }
    
    Rx.Observable.range(1, 3)
      .delayWithMergeMap(500)
      .take(2)
      .subscribe(x => console.log('delayWithMergeMap', x))
    
    // recursive count
    const count = (from, to, interval) => {
      if (from > to) return Rx.Observable.empty();
      return Rx.Observable.timer(interval)
        .mergeMap(() =>
          count(from + 1, to, interval)
          .startWith(from))
    }
    
    count(1, 3, 1000).subscribe(x => console.log('count', x))
    
    // just an example of bit different implementation with no returns
    const countMoreRxWay = (from, to, interval) =>
      Rx.Observable.if(
        () => from > to,
        Rx.Observable.empty(),
        Rx.Observable.timer(interval)
        .mergeMap(() => countMoreRxWay(from + 1, to, interval)
          .startWith(from)))
    
    const maxConcurrencyExample = () =>
      Rx.Observable.range(1,7)
        .do(x => console.log('emitted', x))
        .mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
        .do(x => console.log('processed', x))
        .subscribe()
    
    setTimeout(maxConcurrencyExample, 3100)
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>

    【讨论】:

    • @arturgrzesiak 如果我在每个基本的凝乳操作中使用mergeMap 会怎样?它会影响性能还是其他方面?我没试过,只是有点怀疑。
    • @k11k2 抱歉回复晚了 - 就像 3 年前一样,我也有类似的疑问,从那时起,我一直将整个应用程序构建为单一可观察的,并且 Rx 性能从未成为问题。
    • 这就是它的工作原理。 RxJs 文档上完全非人类可读的解释应该用这个替换。 reactivex.io/rxjs/class/es6/…wtf m8?
    【解决方案2】:

    .mergeMap() 可让您将高阶 Observable 扁平化为单个流。例如:

    Rx.Observable.from([1,2,3,4])
      .map(i => getFreshApiData())
      .subscribe(val => console.log('regular map result: ' + val));
    
    //vs
    
    Rx.Observable.from([1,2,3,4])
      .mergeMap(i => getFreshApiData())
      .subscribe(val => console.log('mergeMap result: ' + val));
    
    function getFreshApiData() {
      return Rx.Observable.of('retrieved new data')
        .delay(1000);
    }
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

    有关.xxxMap() 运算符的深入解释,请参阅我在其他问题上的回答:Rxjs - How can I extract multiple values inside an array and feed them back to the observable stream synchronously

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-11-05
      • 1970-01-01
      • 2012-05-11
      • 2013-07-31
      • 1970-01-01
      • 1970-01-01
      • 2010-11-28
      相关资源
      最近更新 更多