【问题标题】:RxJava - flatmap vs concatMap - why is ordering the same on subscription?RxJava - flatmap vs concatMap - 为什么在订阅时订购相同?
【发布时间】:2016-07-08 00:26:02
【问题描述】:

根据this thread,conCatMap 和 flatmap 仅在发射项目的顺序上有所不同。所以我做了一个测试并创建了一个简单的整数流,并想看看它们会以什么顺序发出。我做了一个小的 observable,它可以接收 1-5 范围内的数字并将它们乘以 2。简单。

这是带有平面图的代码:

myObservable.flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            return Observable.just(integer * 2);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
        Log.v("myapp","from flatMap:"+integer);
        }
    });

以及使用 concatMap 的完全相同的代码:

myObservable.concatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            return Observable.just(integer * 2);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
        Log.v("myapp","from concatmap:"+integer);
        }
    });

当我在日志中看到打印输出时,两者的顺序相同,为什么?我以为只有 concatMap 会保留顺序?

【问题讨论】:

    标签: rx-java flatmap concatmap


    【解决方案1】:

    您所看到的纯属巧合。每次您的 flatMap 返回一个值时,它都会在与前一个线程相同的线程上执行此操作。

    我已修改您的示例以利用多线程:

    Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .flatMap(integer -> Observable.just(integer)
                    .observeOn(Schedulers.computation())
                    .flatMap(i -> {
                        try {
                            Thread.sleep(new Random().nextInt(1000));
                            return Observable.just(2 * i);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            return Observable.error(e);
                        }
                    }))
            .subscribe(System.out::println,
                    Throwable::printStackTrace,
                    () -> System.out.println("onCompleted"));
    

    我将每个 2 * i 值随机延迟以强制执行不同的顺序。另外,我在此之前添加了observeOn(Schedulers.computation()),因此下一个运算符(flatMap)在计算线程池上运行——这具有多线程的魔力。

    这是我的示例(在 Android 上)得到的输出:

    I/System.out: 6
    I/System.out: 4
    I/System.out: 12
    I/System.out: 14
    I/System.out: 8
    I/System.out: 2
    I/System.out: 16
    I/System.out: 20
    I/System.out: 10
    I/System.out: 18
    I/System.out: onCompleted
    

    如果我将just 之后的flatMap 替换为concatMap,那么我会得到一个正确排序的输出。

    有一个great post by Thomas Nield 有正确的解释。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-25
    • 2022-01-21
    • 2017-11-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多