【问题标题】:Multiple REST API calls parallel using RX Observables使用 RX Observables 并行多个 REST API 调用
【发布时间】:2019-02-17 01:00:05
【问题描述】:

我正在尝试使用 Observables 并行调用 REST API。我在这方面面临一些问题。我在这里描述了问题。有人可以帮我吗?

我的用例是,我有 5 个客户 ID,我需要通过同时传递此用户 ID 来调用另一个 REST API 以获取响应。

所以我决定在这里使用 Observables 以获得更好的性能来同时访问服务。

我尝试了以下 3 个选项。但我觉得,它们都提供了相同的响应时间。我没有看到所有这些调用的响应时间有任何差异

有人能在这段代码中找出错误所在吗?我是否正确使用了 Observables?

1) Observable.from(customerIds).flatMap(customerId -> 
                asyncUserRetrieve(customerId)
                .subscribeOn(Schedulers.io()))
                .subscribe(cust -> {
                        custDetails.add(cust);

                });

2) Observable.from(customerIds).flatMap(customerId -> 
                asyncUserRetrieve(customerId)
                .subscribe(cust -> {

                        custDetails.add(cust);
                });
3) for(String id : customerId) {
    Customer customer = asyncUserRetrieve(id).toBlocking().single();.
    custDetails.add(cust);
}

    @Override
    public Observable<Customer> asyncUserRetrieve(String customerId) {
            final URI uri = getURL(customerId);
            final Response response = httpClient.callForResponse(uri);
            if (response.getHttpStatus().is2xxSuccessful()) {
                Customer customer = getResponse(response, customerId);
                return Observable.just(customer);
            }
            return Observable.just(new Customer().setError(true));

        } 

【问题讨论】:

标签: asynchronous java-8 rx-java microservices reactive-programming


【解决方案1】:

问题出在asyncUserRetrieve 实现中。 所以当asyncUserRetreive被调用时发生的事情是:

  1. 调用服务器。
  2. 等待服务器响应到来。 线程被阻塞
  3. 返回一个 Observable。
  4. 返回的 Observable 立即发射。


相反,流程应该是这样的:

  1. 立即返回一个发出服务器请求的 Observable
  2. 返回的 observable 在服务器响应到来时发出。

一种可能的实现方式是:

@Override
public Observable<Customer> asyncUserRetrieve(String customerId) {
    return Observable.fromCallable(() -> {
        final URI uri = getURL(customerId);
        final Response response = httpClient.callForResponse(uri);
        if (response.getHttpStatus().is2xxSuccessful()) {
            Customer customer = getResponse(response, customerId);
            return customer;
        }
        return new Customer().setError(true);

    } 
}

更新:

如果您希望当前线程等到所有响应到来,请尝试执行以下操作:

RxJava 1

List<Customer> tempCustomers = Observable.from(customerIds)
    .flatMap(customerId -> {
        asyncUserRetrieve(customerId)
            .subscribeOn(Schedulers.io())
    })
    .toList()
    .toBlocking() // Do this only if you want to block the thread.
    .single()

custDetails.addAll(tempCustomers);

RxJava 2

List<Customer> tempCustomers = Observable.fromIterable(customerIds)
    .flatMap(customerId -> {
        asyncUserRetrieve(customerId)
            .subscribeOn(Schedulers.io())
    })
    .toList()
    .blockingGet()

custDetails.addAll(tempCustomers);

【讨论】:

  • 如果是这种情况,当前线程将被执行并在api调用返回响应之前完成该过程。
猜你喜欢
  • 2022-06-15
  • 2021-05-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-10-25
  • 1970-01-01
  • 2022-09-23
  • 1970-01-01
相关资源
最近更新 更多