【问题标题】:How to make multiple request and responses from all the requests in Retrofit, RxJava, Android如何从 Retrofit、RxJava、Android 中的所有请求发出多个请求和响应
【发布时间】:2020-08-20 07:37:19
【问题描述】:

我有一个场景,我想为多个设备调用相同的 API 并在完成所有请求后显示结果。 我正在使用改造 2。 我对 RxJava 知之甚少。我认为 zip 运算符将适合此。所以实现如下。

ApiInterface 中的 API:

@PUT(AppConstants.BASE_URL + AppConstants.PATH_SEPARATOR + "/user/endpoint")
Observable<ResponseBody> updateInfo(@Header("Authorization") String token, @Query("device_id") String deviceId, @Body JsonObject body);

这是一个调用 API 的方法。它在 Map 中获取设备 ID 及其主体。此方法为 Map 中可用的每个设备 ID 调用 API。

public void updateAllInfo(final HashMap<String, String> deviceIdMap, final ApiResponseListener listener) {

    List<Observable<ResponseBody>> requests = new ArrayList<>();
    ArrayList<String> reqIdList = new ArrayList<>();

    for (Map.Entry<String, String> entry : map.entrySet()) {

        String deviceId = entry.getKey();
        String jsonBodyStr = entry.getValue();
        Gson gson = new Gson();
        JsonObject jsonBody = gson.fromJson(jsonBodyStr, JsonObject.class);

        reqIdList.add(deviceId);
        requests.add(apiInterface.updateSchedules("accessToken", deviceId, jsonBody));
    }

    Observable.zip(requests, new Function<Object[], List<ResponseBody>>() {

        @Override
        public List<ResponseBody> apply(Object[] objects) throws Exception {

            Log.e("onSubscribe", "apply : " + objects.length);
            List<ResponseBody> dataResponses = new ArrayList<>();
            for (Object o : objects) {
                dataResponses.add((ResponseBody) o);
            }
            return dataResponses;
        }
    })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<ResponseBody>>() {

                @Override
                public void accept(List<ResponseBody> responseBodies) throws Exception {
                    Log.e("onSubscribe", "YOUR DATA IS HERE: " + responseBodies.size());
                    for (int i = 0; i < responseBodies.size(); i++) {
                        Log.e(TAG, "Response received for " + i + " is : " + responseBodies.get(i).string());
                    }
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.e("onSubscribe", "Throwable: " + throwable);
                }
            });
}

我想获得每个设备 ID 的响应(成功/失败)。意味着我需要响应以及调用 API 的 id。 使用 zip 运算符,如果任何 API 失败,则在 accept(Throwable throwable) 方法中收到失败。如果任何 API 失败,我认为 zip 运算符不会调用下一个 API。

如何获得所有请求的响应(成功或失败)?

还需要一些东西来指示响应是针对哪个请求/设备ID(某些映射)来显示结果。

我可以使用其他运算符来代替 zip 吗?

有什么建议/帮助吗?

【问题讨论】:

    标签: android retrofit2 rx-java rx-android


    【解决方案1】:

    我对java有点生疏,所以我会用Kotlin写我的答案,你自己转换应该不成问题。

    创建一个包含ResponseBodydeviceId 的助手类:

    data class IdentifiedResponseBody(
        val deviceId: String, 
        val responseBody: ResponseBody?
    )
    

    然后:

    // change the signature of your requests list to return IdentifiedResponseBody observables
    val requests = mutableListOf<Observable<IdentifiedResponseBody>>()
    ...
    // your stated API have updateInfo instead of updateSchedules, but I will assume they have the same signature
    requests.add(
        apiInterface.updateSchedules("accessToken", deviceId, jsonBody)
            .map { responseBody ->
                // map the added observable to return IdentifiedResponseBody
                IdentifiedResponseBody(deviceId, responseBody)
            }
            .onErrorReturn { error -> 
                // return an item here instead of throwing error, so that the other observables will still execute
                IdentifiedResponseBody(deviceId, null)
            }
    )
    

    最后,用merge代替zip

    Observable.merge(requests)
            .subscribeOn(Schedulers.io())
            // it's a question if you want to observe these on main thread, depends on context of your application
            .subscribe(
                { identifiedResponse ->
                    // here you get both the deviceId and the responseBody
                    Log.d("RESPNOSE", "deviceId=${identifiedResponse.deviceId}, body=${identifiedResponse.responseBody}")
                    if (responseBody == null || responseBody.hasError()) {
                        // request for this deviceId failed, handle it
                    }
                },
                { error ->
                    Log.e("onSubscribe", "Throwable: " + error)
                }
            )
    

    merge:http://reactivex.io/documentation/operators/merge.html

    ziphttp://reactivex.io/documentation/operators/zip.html

    您应该会看到深刻的区别:zip 将您的响应组合到由映射函数定义的单个项目(即您的案例中的响应列表),而 merge 在返回时单独发出所有响应.如果这里是zip,则在所有请求完成时(并且仅)返回组合结果;您可能不希望这种行为,就好像单个请求不会返回响应一样,您根本不会得到任何响应。

    更新

    java 等效项应如下所示,但在尝试之前进行修改,因为我不确定我是否正确转换了所有内容:

    requests.add(
        apiInterface.updateSchedules("accessToken", deviceId, jsonBody)
                    .map(new Function<ResponseBody, IdentifiedResponseBody>() {
                        @Override
                        public IdentifiedResponseBody apply(ResponseBody responseBody) throws Exception {
                            return new IdentifiedResponseBody(deviceId, responseBody);
                        }
                    })
                    .onErrorReturn(new Function<Throwable, IdentifiedResponseBody>() {
                        @Override
                        public IdentifiedResponseBody apply(Throwable throwable) throws Exception {
                            return new IdentifiedResponseBody(deviceId, null);
                        }
                    })
            );
    
    Observable.merge(requests)
                .subscribeOn(Schedulers.io())
                .subscribe(new Consumer<IdentifiedResponseBody>() {
                               @Override
                               public void accept(IdentifiedResponseBody identifiedResponseBody) throws Exception {
                                   // same logic as from kotlin part
                               }
                           },
                        new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {
                                Log.e("onSubscribe", "Throwable: " + throwable);
                            }
                        });
    

    更新 2

    在你问的评论中:

    有什么方法可以让我获得所有请求的最终回调 完成

    这就是使用Observable 而不是Single/Completable 的问题,除非您明确关闭通道或抛出错误,否则它不会完成。在理想情况下,Observable 应该用于连续发出一些数据的流,例如到 Room DB 的开放通道,因为不知道 DB 将更改多少次。我承认,在您的情况下,似乎很难应用 Observable 以外的其他东西。但是有一个解决方法:

    Observable.merge(requests)
        // emits only this much items, then close this channel
        .take(requests.size.toLong())
        // executed when the channel is closed or disposed
        .doFinally {
             // todo: finalCallback
         }
        .subscribeOn(Schedulers.io())
        .subscribe(...)
    

    代码再次在 Kotlin 中,转换为 java 应该不难。请检查Observable.take() 做了什么:http://reactivex.io/documentation/operators/take.html

    【讨论】:

    • 非常感谢您的帮助。非常感谢。
    • 对这个答案的另一个查询。有什么方法可以让我获得所有已完成请求的最终回调?
    • 很好的答案。我想我需要一个手动计数变量。非常感谢。
    猜你喜欢
    • 2017-12-03
    • 1970-01-01
    • 2021-09-04
    • 1970-01-01
    • 2021-11-25
    • 1970-01-01
    • 2016-08-15
    • 1970-01-01
    • 2019-12-12
    相关资源
    最近更新 更多