【问题标题】:Concat VS Merge operatorConcat VS 合并运算符
【发布时间】:2016-08-11 17:54:50
【问题描述】:

我正在查看 RXJava 的文档,我注意到 concat 和 merge 运算符似乎做同样的事情。 我写了几个测试来确定。

@Test
public void testContact() {

    Observable.concat(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
              .subscribe(System.out::println);
}

@Test
public void testMerge() {

    Observable.merge(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
            .subscribe(System.out::println);
}

文档说

Merge 运算符也类似。它结合了两个或多个 Observable 的发射,但可以将它们交错,而 Concat 从不交错来自多个 Observable 的发射。

但我还是不完全明白,运行这个测试数千次合并结果总是一样的。由于未授予订单,因此我期望有时会出现“反应性”“世界”“你好”。

代码在这里https://github.com/politrons/reactive

【问题讨论】:

    标签: java java-8 rx-java


    【解决方案1】:

    正如您引用的文档中所述-合并可以交错输出,而 concat 将首先等待较早的流完成,然后再处理以后的流。 在您的情况下,对于单元素静态流,它没有任何真正的区别(但理论上,合并可以以随机顺序输出单词,并且根据规范仍然有效)。如果您想看到差异,请尝试以下操作(您需要在之后添加一些 sleep 以避免提前退出)

        Observable.merge(
                Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
                Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
        .subscribe(System.out::println);
    

    A0 B0 A1 B1 B2 A2 B3 A3 B4 A4

        Observable.concat(
                Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
                Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
        .subscribe(System.out::println);
    

    A0 A1 A2 A3 A4 A5 A6 A7 A8

    Concat 永远不会开始打印 B,因为流 A 永远不会结束。

    s/stream/observable/g ;)

    文档提供了很好的图表来显示差异。您需要记住,merge 没有保证一个一个地交错项目,它只是一个可能的例子。

    连接

    合并

    【讨论】:

    • 快速说明,如果源是同步的,那么merge = concat
    • 好点!这解释了为什么我的示例总是按相同的顺序进行。那是让我困惑的部分。以 Dave 为例,因为间隔(异步)它是可重现的
    • @ArturBiesiadowski 你介意在你的答案中添加zip 吗?
    • @DaveMotenby 例如,如果我在网络中的一个来源和第二个来源是数据库,那么它们都是异步的
    • @AbhishekKumar 同步意味着源在完成之前不会放弃控制。
    【解决方案2】:

    连接

    Concat 从两个或多个 Observable 发射发射,而不将它们交错。它将在发出项目时保持可观察对象的顺序。这意味着它将发出第一个可观察对象的所有项目,然后它会发出第二个可观察对象的所有项目,依此类推。

    让我们通过一个例子清楚地理解它。

    final String[] listFirst = {"A1", "A2", "A3", "A4"};
    final String[] listSecond = {"B1", "B2", "B3"};
    
    final Observable<String> observableFirst = Observable.fromArray(listFirst);
    final Observable<String> observableSecond = Observable.fromArray(listSecond);
    
    Observable.concat(observableFirst, observableSecond)
            .subscribe(new Observer<String>() {
    
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(String value) {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    当我们使用 Concat 运算符时,它将保持顺序并发出 A1、A2、A3、A4、B1、B2、B3 的值。

    合并

    Merge 通过合并它们的发射将多个 Observable 合并为一个。它不会在发出项目时保持顺序。

    让我们通过一个例子清楚地理解它。

    final String[] listFirst = {"A1", "A2", "A3", "A4"};
    final String[] listSecond = {"B1", "B2", "B3"};
    
    final Observable<String> observableFirst = Observable.fromArray(listFirst);
    final Observable<String> observableSecond = Observable.fromArray(listSecond);
    
    Observable.merge(observableFirst, observableSecond)
            .subscribe(new Observer<String>() {
    
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(String value) {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    由于我们使用的是合并运算符,它不会保持顺序,并且可以按任何顺序发出值,例如 A1、B1、A2、A3、B2、B3、A4 A1, A2, B1, B2, A3, A4, B3 或者可以是任何东西。

    这就是我们应该如何在 RxJava 中使用 Concat 和 Merge 运算符,具体取决于我们的用例。

    【讨论】:

      【解决方案3】:

      如果两个源是同步的,如 OP 所述,它们会通过 mergeconcat 返回确切的顺序。

      但是当源不同步时,例如两个源(例如数据库和一个套接字)以随机或未确定的延迟推送数据,mergeconcat 的结果或数据顺序不同

      fun <T> Observable<T>.getAsyncItems(): Observable<T> {
      
          return concatMap {
      
              val delay = Random().nextInt(10)
      
              Observable.just(it)
                      .delay(delay.toLong(), TimeUnit.MILLISECONDS)
          }
      }
      

      上面的这个扩展函数会随机地为每个发射添加延迟,但它们总是以相同的顺序发射。如果数据是 A、B、C、D 和 E,则输出始终是 A、B、C、D 和 E。

      现在让我们比较一下mergeconcat 在源异步时的行为。

      val source1 =
              Observable.just("A", "B", "C", "D", "E").getAsyncItems()
      val source2 =
              Observable.just(1, 2, 3, 4, 5).getAsyncItems()
      
      Observable.merge(source1, source2).subscribe {print("$it-")}
      

      打印类似A-B-1-C-2-D-3-4-E-5-A-1-2-3-B-C-D-4-E-5- 这取决于随机延迟。

      Observable.concat(source1, source2).subscribe {print("$it-")}
      

      无论运行多少次都会打印A-B-C-D-E-1-2-3-4-5-

      【讨论】:

        猜你喜欢
        • 2016-04-02
        • 2012-09-19
        • 2010-09-15
        • 2013-09-13
        • 1970-01-01
        • 2010-09-08
        • 2018-05-02
        • 2015-03-31
        • 1970-01-01
        相关资源
        最近更新 更多