【问题标题】:RxJava2 - Chaining observables that emit different typesRxJava2 - 链接发出不同类型的可观察对象
【发布时间】:2018-03-09 05:19:24
【问题描述】:

我正在学习 RxJava2,我需要链接三个 observable:

第一个对数据进行操作:

Completable performOperations(Data data); // performs expensive operations.

第二个将数据上传到服务器1并发出百分比进度。

Observable<Integer> uploadToServer1(Data data); // while it performs the upload, it calls several times onNext(progress) and finally calls onComplete().

第三个只是通知服务器2上传完成。

Completable informUploadedToServer2(Data data); // just calls a REST API.

我想在我的 Activity 中显示第二个 observable 的进度,并最终在第三个 observable 成功完成时显示成功。如果三个 observable 中的任何一个抛出异常,我也应该在 Activity 中显示错误。

我尝试使用 concat 进行链接,但它不会编译,因为 uploadToServer1 发出 Integer 类型,而其余的则不会。

public void upload(Data data, MyCallback callback) {
    Observable.concat(performOperations(data).toObservable(), uploadToServer1(data), informUploadedToServer2(data))
    .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new DisposableObserver<Integer>() {
                    @Override
                    public void onNext(Integer integer) {
                        callback.onProgressChanged(integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        callback.onError();
                    }

                    @Override
                    public void onComplete() {
                        callback.onSuccess();
                    }
                });
}

我已经看到了,如果我改成

Observable.concat(performOperations(data).<Integer>toObservable(), uploadToServer1(data), informUploadedToServer2(data).<Integer>toObservable())

它会起作用,但是,这是推荐的方法吗?

此外,如果第一个 observable 发出非整数怎么办?例如,DataDiff 对象将描述执行特定操作后的修改:

Observable<DataDiff> performOperations(Data data);

我应该如何订阅,以便我可以收听onNext(Integer)onNext(DataDiff),以便活动可以相应地更新视图?

谢谢。

【问题讨论】:

    标签: android rx-java2


    【解决方案1】:

    我会以不同的方式来做这件事,一种更“流畅”的方法。

    首先performOperations(),然后使用andThen 运算符与Observable&lt;Integer&gt; 连接,然后您可以使用concatWith,以便之后Observable&lt;Integer&gt; 中的所有元素都被发射informUploadedToServer2 被执行。然后你可以处理订阅消费者中发出的Integer,如果你observeOn(AndroidSchedulers.mainThread)你可以安全地通知你的Activity那里

    performOperations(data)
            .andThen(uploadToServer1(data))
            .concatWith(informUploadedToServer2(data))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                   // notify your Activity here
                }
            });
    

    如果您需要拦截其中一个流的完成,您可以使用doOnComplete,例如

    performOperations(data)
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                   // after performOperations has completed but before 
                   // uploadToServer1 has started
                }
            })
            .andThen(uploadToServer1(data))
            // ...
    

    如果performOperations()返回Observable&lt;DataDiff&gt;,你可以使用doOnNext拦截所有事件,然后使用ignoreElements运算符将其转换为Completable,然后像以前一样使用andThen

    performOperations()
            .doOnNext(new Consumer<DataDiff>() {
                @Override
                public void accept(DataDiff dataDiff) throws Exception {
                    // handle DataDiff here
                }
            })
            .ignoreElements()
            .andThen(uploadToServer1())
            // ...
    

    【讨论】:

    • 请注意,concatWith(Completable) 是在 RxJava 2.1.10 中添加的。
    猜你喜欢
    • 2018-01-10
    • 2017-03-08
    • 2015-01-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-26
    • 2018-10-25
    • 2018-06-07
    相关资源
    最近更新 更多