【问题标题】:Combining many ReactiveX streams into one result stream将多个 ReactiveX 流合并为一个结果流
【发布时间】:2018-06-08 11:27:04
【问题描述】:

我正在尝试使用 RxJava 来理解 ReactiveX,但我无法理解整个 Reactive 的想法。我的情况如下:

我有Task 课程。它具有perform() 方法,该方法正在执行HTTP 请求并通过executeRequest() 方法获取响应。请求可能会被执行多次(定义的重复次数)。我想获取executeRequest() 的所有结果并将它们组合到Flowable 数据流中,这样我就可以在perform() 方法中返回这个Flowable。所以最后我希望我的方法返回我的Task 执行的请求的所有结果。

executeRequest() 返回Single,因为它只执行一个请求并且可能只提供一个响应或根本不提供响应(在超时的情况下)。 在perform() 中,我为每次重复创建Flowable 数字范围。订阅此Flowable 我每次重复执行一个请求。我还订阅了每个响应Single,以便将响应记录并收集到一个集合中以供以后使用。所以现在我有一组Singles,如何将它们合并到Flowable 中以在perform() 中返回它?我试图弄乱merge() 之类的运算符,但我不了解它的参数类型。

我在网上阅读了一些指南,但它们都非常笼统,或者没有根据我的情况提供示例。

public Flowable<HttpClientResponse> perform() {

    Long startTime = System.currentTimeMillis();

    List<HttpClientResponse> responses = new ArrayList<>();
    List<Long> failedRepetitionNumbers = new ArrayList<>();

    Flowable.rangeLong(0, repetitions)
            .subscribe(repetition -> {
                logger.debug("Performing repetition {} of {}", repetition + 1, repetitions);

                Long currentTime = System.currentTimeMillis();

                if (durationCap == 0 || currentTime - startTime < durationCap) {

                    Single<HttpClientResponse> response = executeRequest(method, url, headers, body);

                    response.subscribe(successResult -> {
                                logger.info("Received response with code {} in the {}. repetition.", successResult
                                        .statusCode(), repetition + 1);
                                responses.add(successResult);
                            },
                            error -> {
                                logger.error("Failed to receive response from {}.", url);
                                failedRepetitionNumbers.add(repetition);
                            });
                    waitInterval(minInterval, maxInterval);
                } else {
                    logger.info("Reached duration cap of {}ms for task {}.", durationCap, this);
                }
            });

    return Flowable.merge(???);
}

还有executeRequest()

private Single<HttpClientResponse> executeRequest(HttpMethod method, String url, LinkedMultiValueMap<String, String>
        headers, JsonNode body) {

    CompletableFuture<HttpClientResponse> responseFuture = new CompletableFuture<>();

    HttpClient client = vertx.createHttpClient();
    HttpClientRequest request = client.request(method, url, responseFuture::complete);
    headers.forEach(request::putHeader);
    request.write(body.toString());
    request.setTimeout(timeout);
    request.end();

    return Single.fromFuture(responseFuture);
}

【问题讨论】:

    标签: java asynchronous rx-java2 reactivex


    【解决方案1】:

    不要在 perform 方法中订阅每个 observable(每个 HTTP 请求),而是继续像这样链接 observable。您的代码可以简化为类似的东西。

        public Flowable<HttpClientResponse> perform() {
        // Here return a flowable , which can emit n number of times. (where n = your number of HTTP requests)
        return Flowable.rangeLong(0, repetitions) // start a counter
                .doOnNext(repetition -> logger.debug("Performing repetition {} of {}", repetition + 1, repetitions)) // print the current count
                .flatMap(count -> executeRequest(method, url, headers, body).toFlowable()) // get the executeRequest as Flowable
                .timeout(durationCap, TimeUnit.MILLISECONDS); // apply a timeout policy
        }
    

    最后,你可以在你真正需要执行这一切的地方订阅perform,如下图

                 perform()
                .subscribeWith(new DisposableSubscriber<HttpClientResponse>() {
                    @Override
                    public void onNext(HttpClientResponse httpClientResponse) {
                      // onNext will be triggered each time, whenever a request has executed and ready with result
                      // if you had 5 HTTP request, this can trigger 5 times with each "httpClientResponse" (if all calls were success)
                    }
    
                    @Override
                    public void onError(Throwable t) {
                        // any error during the execution of these request,
                        // including a TimeoutException in case timeout happens in between
                    }
    
                    @Override
                    public void onComplete() {
                       // will be called finally if no errors happened and onNext delivered all the results
                    }
                });
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-18
      • 2020-10-11
      • 1970-01-01
      • 2019-04-16
      • 2020-03-20
      • 1970-01-01
      • 1970-01-01
      • 2017-01-15
      相关资源
      最近更新 更多