【问题标题】:Compose multiple network calls RxJava - Android编写多个网络调用 RxJava - Android
【发布时间】:2014-11-02 01:27:20
【问题描述】:

帮助编写多个网络调用并在 Rxjava 中累积结果。 (我在 Android 应用程序中使用。)

State 
 -- List<City> cityList;

 City
 - cityId;

RestCall 1
Observable<State> stateRequest = restService.getStates();

RestCall 2
Observable<CityDetail> cityRequest = restService.getCityDetail(cityId);

在 UI 中,我必须在获取每个城市的所有详细信息后显示城市列表,然后在列表视图中显示。 我如何实现并行网络调用并累积结果。 ?

我希望将所有城市详细信息结果放入源状态“对象”的列表中。由于状态对象也有一些需要显示的信息。这可能吗?

stateRequest ??? 
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<State>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(State result) {
    // Get city list and display
    }
});

我查看了这个示例,该示例显示了我们如何压缩多个可观察到的响应。下面 sn-p 显示了 3 个可观察的组合。 但在我的情况下,我必须并行或顺序进行 20 个网络调用(我的意思是在后台但一个接一个)。我如何做到这一点。有什么帮助或指示吗?

https://gist.github.com/skehlet/9418379

Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
    @Override
    public Map<String, String> call(String s, Integer integer, Integer integer2) {
        Map<String, String> map = new HashMap<String, String>();
        map.put("f3", s);
        map.put("f4", String.valueOf(integer));
        map.put("f5", String.valueOf(integer2));
        return map;
    }

【问题讨论】:

    标签: android system.reactive rx-java


    【解决方案1】:

    我认为您的代码可以简化为这样的代码,因为您使用 zip 运算符与使用 toList 运算符很接近

     stateRequest
     .subscribe(State state ->  {
         Observable.from(state.getCityList())
                   .flatMap(City city -> restService.getCityDetail(city.getId())
                   .toList()
                   .subscribe(List<City> cities -> {
    
                         state.clear();
                         state.addAll(cities);
                   });
         });
    

    由于 RxJava 不提供 throttle 运算符,你可以构建类似这样的东西:

    Observable<City> limiter = Observable.zip(Observable.interval(1, SECONDS), aCity, (i, c) -> c);
    

    使用这个,limiter 是一个每秒会发射一个城市的 observable。

    因此,使用您的代码,如果您想限制对 getCityDetail 的调用,例如:

     Observable<Object> limiter = Observable.interval(1, SECONDS);
     stateRequest
     .subscribe(State state ->  {
         Observable.zip(limiter, Observable.from(state.getCityList()), (i, c) -> c)
                   .flatMap(City city -> restService.getCityDetail(city.getId())
                   .toList()
                   .subscribe(List<City> cities -> {
    
                         state.clear();
                         state.addAll(cities);
                   });
         });
    

    【讨论】:

    • 间隔压缩是一个好方法。对我有用。接受你的回答。
    【解决方案2】:
    stateRequest
    .flatMap(new Func1<State, Observable<State>>() {
        @Override
        public Observable<State> call(final State state) {
    
            List<Observable> cityObservablesList = new ArrayList<Observable>();
    
            for(City city: state.getCityList()) {
                cityObservablesList.add(restService.getCityDetail(city.getId());
            }
    
            Observable cityObservables = Observable.from(cityObservablesList);
            return Observables.zip(cityObservables, new FuncN<State>() {
                @Override
                public State call(Object... args) {
                    List<City> cityList = state.getCityList();
                    cityList.clear();
                    for(Object object: args) {
                        cityList.add((City)object);
                    }
    
                    return state;
                }
            })
        })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<State>() {
        @Override
        public void onCompleted() {
    
        }
    
        @Override
        public void onError(Throwable e) {
        }
    
        @Override
        public void onNext(State result) {
        // Get city list and display
        }
    });
    

    我在 zip 运算符和 Iterable for city list 作为第一个参数的帮助下得到了它。 但我面临另一个问题。由于 zip 并行执行作业,因此并行执行 10-15 个网络调用,服务器拒绝最大查询每秒错误 (QPS - 403)。 如何指示 zip 操作员一个接一个地执行任务?

    我确实通过向可观察城市添加延迟 [delay(c*200, TimeUnit.MILLISECONDS))] 解决了这个问题。但似乎不是一个合适的解决方案。

    有什么建议吗?

    【讨论】:

    • 我的建议是将此作为一个新问题提出。尽管如此,zip 不应该并行执行这些项目 - Rx 有行为保证。我会在for 循环中查看您对getCityDetail 的调用。这是造成问题的原因吗?
    【解决方案3】:

    看看 flatMap(Function.., BiFunction..)。也许这就是你需要的。

    statesRepository.getStates()
       .flatMap(states-> Observable.fromIterable(states))
       .flatMap(
               state-> cityRepository.getStateCities(state),
               (state, cityList) -> {
                   state.setCities(cityList);
                   return state;
               })
       .subscribe(state-> showStateWithCity(state));
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-02-02
      • 2017-10-03
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多