【问题标题】:Why flatMap is implemented with merge in RxJava?为什么在 RxJava 中使用 merge 实现 flatMap?
【发布时间】:2017-05-21 06:45:39
【问题描述】:

为什么RxJava 1.x flatMap() 操作符是用merge实现的?

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));
    }

从 flatMap() 调用中,我只能返回一个符合 &lt;? extends Observable&lt;? extends R&gt;&gt; 的 Observable。比 map(func) 调用将它包装到另一个 Observable 中,这样我们就有了类似Observable&lt;? extends Observable&lt;? extends R&gt;&gt; 的东西。这让我认为 map(func) 之后的 merge() 调用是不必要的。

merge() 运算符执行以下操作:

将发出 Observable 的 Observable 扁平化为单个 Observable 发出那些 Observables 发出的项目,没有任何 转型。

现在在平面地图中,我们只能有一个发出一个 Observable 的 Observable。为什么要合并?我在这里错过了什么?

谢谢。

【问题讨论】:

    标签: rx-java rx-android flatmap


    【解决方案1】:

    现在在平面地图中,我们只能有一个发出一个 Observable 的 Observable。

    否 - 您有一个可观察对象,每个源可观察项目发出一个可观察对象。因此,如果您的源 observable 有更多项目,您将发出多个 observable。

    这就是你需要merge()的原因。

    【讨论】:

      【解决方案2】:

      查看签名可能会有所帮助:

      想象一下,使用Observable&lt;String&gt; 你想flatMap 到单个字符。使用 flatMap 的方法是:

      Observable.just("foo", "hello")
                .flatMap(s -> Observable.from(s.split("")))
      

      这个 Observable 的类型是什么?这是一个Observable&lt;String&gt;

      现在不要使用flatMap,而是使用map具有相同的功能。是什么类型的?

      Observable.just("foo", "hello")
                .map(s -> Observable.from(s.split("")))
      

      你会看到它实际上是Observable&lt;Observable&lt;String&gt;&gt;... 如果我们订阅这个 observable 并打印出发射的项目,我们会得到:

      rx.Observable@5b275dab
      rx.Observable@61832929
      

      不是很有用。更糟糕的是,这些Observables 还没有被订阅,所以它们不会发出任何数据:(

      我们看到flatMap 的目标是让函数为每个源项目生成一个内部Observable&lt;T&gt;,然后订阅这些内部可观察对象并将它们的发射平展在输出Observable&lt;T&gt; 中。而merge 就是这样做的!

      要验证这一点,请将上面的地图结果包装在 Observable.merge(...) 中:

      Observable<Observable<String>> mapped = 
          Observable.just("foo", "hello")
                    .map(s -> Observable.from(s.split("")));
      
      Observable.merge(mapped)
                .subscribe(System.out::println);
      

      这个输出:

      f
      o
      o
      h
      e
      l
      l
      o
      

      【讨论】:

        猜你喜欢
        • 2014-05-15
        • 2018-11-22
        • 2015-03-26
        • 2016-07-08
        • 1970-01-01
        • 2019-12-29
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多