【问题标题】:Retrofit 2 + RxJava chaining similar requests together cleanlyRetrofit 2 + RxJava 将相似的请求干净地链接在一起
【发布时间】:2016-03-19 09:49:15
【问题描述】:

目标

在我的应用程序中,我希望使用 Retrofit 2 将一些文件上传到我的 Web 服务器(一次多个,在本例中为图片。),并且在完成每个文件的上传后,我想更新一个 SQL 表该特定文件的路径。

试过

我是使用函数式编程范例的新手,所以我的理解在这里可能会被误导。我有两个不同的对象,一个 FileResponse(DTO 表示上传文件后来自我的 Web 服务器的响应)和一个 Photo 对象。这个 Photo 对象有两个字段,并充当我的后端服务器中持久化的实体。它的字段对应于列,并且单独上传这些对象中的每一个都可以正常工作。我试图做的是一次扫描完成这个双重操作。

这是我尝试过的:

List<Observable<FileResponse>> mObservables = new ArrayList<>(3);

...
...

Observable.merge(mObservables).
            map(new Func1<FileResponse, Observable<Photo>>() {
                @Override
                public Observable<Photo> call(FileResponse fileResponse) {
                    Photo photo = new Photo();
                    photo.setPath(fileResponse.getUrl());
                    photo.setId("demo123");
                    return mPhotoService.createPhoto(photo);
                }
            }).forEach(
                new Action1<Observable<Photo>>() {
                    @Override
                    public void call(Observable<Photo> photoObservable) {

                    }
                },
                new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.e(TAG, "Error: " + throwable.getMessage());
                    }
                }
            );

这是我背后的逻辑:

merge( ) — 将多个 Observable 合并为一个。

我希望将我创建的 Observable 列表合并在一起,以便对它们中的每一个执行相同的操作。在这种情况下,Observables 的类型是 FileResponse。这是上传到文件网络服务。

map ( ) - 通过对每个项目应用函数来转换 Observable 发出的项目。

在这种情况下,我想将一个函数应用于返回一个新的 Observable Observable 的 FileResponse Observables。

forEach( ) — 对 Observable 发出的每个项目调用一个函数;阻塞直到 Observable 完成。

然后这将获取每个发出的 Observable 并上传它。但在这种情况下,这部分失败了。文件成功上传,但 forEach 部分只返回“错误:空”。

问题

这种理解是否接近我想要达到的目标?同样,我只是希望将多个请求链接在一起,当其他请求成功时执行新请求。有没有更好/正确的方法来做到这一点?

编辑说明:

我忘了说,我可以像这样实现我想要的:

Observable.merge(mObservables)
            .map(new Func1<FileResponse, Observable<Photo>>() {
                @Override
                public Observable<Photo> call(FileResponse fileResponse) {
                    Photo photo = new Photo();
                    photo.setPath(fileResponse.getUrl());
                    photo.setDogId("123");
                    return mPhotoService.createPhoto(photo);
                }
            })
           .subscribe(new Subscriber<Observable<Photo>>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "Save complete!");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "Error: " + e.getMessage());
                    }

                    @Override
                    public void onNext(Observable<Photo> photoObservable) {
                        photoObservable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
                                .subscribe(new Subscriber<Photo>() {
                                    @Override
                                    public void onCompleted() {
                                        Log.d(TAG, "Here!");
                                    }

                                    @Override
                                    public void onError(Throwable e) {
                                        Log.d(TAG, "Error! " + e.getMessage());
                                    }

                                    @Override
                                    public void onNext(Photo photo) {
                                        Log.d(TAG, "Woo!!");
                                    }
                                });
                    }
                }
            );

但我试图找到一种可能更清洁的方法来做到这一点。如果这是唯一的方法,那就够公平了。只是想就我的想法获得更多意见,看看我能做些什么来使它看起来更好(或最佳实践)。

【问题讨论】:

    标签: android rest rx-java retrofit2


    【解决方案1】:

    为了使其更简洁(并避免额外订阅onNext 中的项目),您可以将map() 切换为flatMap()

    Observable.merge(mObservables)
                    .flatMap(new Func1<FileResponse, Observable<Photo>>() {
                        @Override
                        public Observable<Photo> call(FileResponse fileResponse) {
                            Photo photo = new Photo();
                            return mPhotoService.createPhoto(photo);
                        }
                    })
                    .forEach(new Action1<Photo>() {
                        @Override
                        public void call(Photo photo) {
    
                        }
                    });
    

    正如您在 sn-p 中看到的,flatMap 之后的下一个运算符将看到 Photo 对象(与 Observable&lt;Photo&gt; 之后的 map() 相比)。

    【讨论】:

    • 很好,我喜欢这个。一个问题是它在主线程 android.os.NetworkOnMainThreadException 上调用时返回错误
    • @LucasCrawford 我不确定您希望在链的哪个部分将线程切换到后台线程,但是,例如,您可以这样做:mPhotoService.createPhoto(photo).subscribeOn(Schedulers.io())。这会将您的网络请求并行发送到多个后台线程。如果您打算在 forEach() 中进行任何 UI 工作,您可以在它之前致电 observeOn(AndroidSchedulers.mainThread())
    • 完美。我最初添加 Schedulers.newThread() 而不是 io() ,它给了我主线程上的网络。我猜它每次都需要使用线程池而不是一个新线程
    • 还有一个问题,如果我只有一个 Observable(不是很多)并且希望一个接一个地链接两个不同的请求怎么办?在这种情况下,不能使用 Observable.merge()
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-08-15
    • 2016-07-31
    • 2015-08-26
    • 2021-11-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多