【发布时间】:2018-06-08 11:27:04
【问题描述】:
我正在尝试使用 RxJava 来理解 ReactiveX,但我无法理解整个 Reactive 的想法。我的情况如下:
我有Task 课程。它具有perform() 方法,该方法正在执行HTTP 请求并通过executeRequest() 方法获取响应。请求可能会被执行多次(定义的重复次数)。我想获取executeRequest() 的所有结果并将它们组合到Flowable 数据流中,这样我就可以在perform() 方法中返回这个Flowable。所以最后我希望我的方法返回我的Task 执行的请求的所有结果。
executeRequest() 返回Single,因为它只执行一个请求并且可能只提供一个响应或根本不提供响应(在超时的情况下)。
在perform() 中,我为每次重复创建Flowable 数字范围。订阅此Flowable 我每次重复执行一个请求。我还订阅了每个响应Single,以便将响应记录并收集到一个集合中以供以后使用。所以现在我有一组Singles,如何将它们合并到Flowable 中以在perform() 中返回它?我试图弄乱merge() 之类的运算符,但我不了解它的参数类型。
我在网上阅读了一些指南,但它们都非常笼统,或者没有根据我的情况提供示例。
public Flowable<HttpClientResponse> perform() {
Long startTime = System.currentTimeMillis();
List<HttpClientResponse> responses = new ArrayList<>();
List<Long> failedRepetitionNumbers = new ArrayList<>();
Flowable.rangeLong(0, repetitions)
.subscribe(repetition -> {
logger.debug("Performing repetition {} of {}", repetition + 1, repetitions);
Long currentTime = System.currentTimeMillis();
if (durationCap == 0 || currentTime - startTime < durationCap) {
Single<HttpClientResponse> response = executeRequest(method, url, headers, body);
response.subscribe(successResult -> {
logger.info("Received response with code {} in the {}. repetition.", successResult
.statusCode(), repetition + 1);
responses.add(successResult);
},
error -> {
logger.error("Failed to receive response from {}.", url);
failedRepetitionNumbers.add(repetition);
});
waitInterval(minInterval, maxInterval);
} else {
logger.info("Reached duration cap of {}ms for task {}.", durationCap, this);
}
});
return Flowable.merge(???);
}
还有executeRequest()
private Single<HttpClientResponse> executeRequest(HttpMethod method, String url, LinkedMultiValueMap<String, String>
headers, JsonNode body) {
CompletableFuture<HttpClientResponse> responseFuture = new CompletableFuture<>();
HttpClient client = vertx.createHttpClient();
HttpClientRequest request = client.request(method, url, responseFuture::complete);
headers.forEach(request::putHeader);
request.write(body.toString());
request.setTimeout(timeout);
request.end();
return Single.fromFuture(responseFuture);
}
【问题讨论】:
标签: java asynchronous rx-java2 reactivex