【问题标题】:Proper usage of Retrofit + RxJava's combineLatest正确使用 Retrofit + RxJava 的 combineLatest
【发布时间】:2016-08-03 16:15:15
【问题描述】:

我想异步执行 2 个网络调用 - 我正在使用 Retrofit+RxJava 来完成此操作。这个逻辑来自一个简单的 Runner 类来测试解决方案。注意:这主要涉及服务器端的 RxJava。

我的代码如下所示:

public static void main(String[] args) throws Exception {
  Api api = ...;

  Observable.combineLatest(
      api.getStates(),
      api.getCmsContent(),
      new Func2<List<States>, CmsContent, String>() {
        @Override public String call(List<State> states, CmsContent content) {
          ...
          return "PLACEHOLDER";
        }
      })
      .observeOn(Schedulers.immediate())
      .subscribeOn(Schedulers.immediate())
      .subscribe(new Observer<String>() {
        @Override public void onCompleted() {
          System.out.println("COMPLETED");
        }

        @Override public void onError(Throwable e) {
          System.out.println("ERROR: " + e.getMessage());
        }

        @Override public void onNext(String s) {
          // I don't care what's returned here
        }
      });
}

三个问题:

  1. 当您希望异步执行多个 REST 调用并在所有调用完成后继续执行时,Observable.combineLatest 是不是最适合使用的运算符?
  2. 我的Func2 实现当前返回String。在 2 个 API 调用执行后,我将在 Func2#call() 方法中处理结果。我不在乎返回什么 - 但是必须有更好的方法来处理这个 - 我是正确的吗?
  3. API 调用使用上面的代码正确执行。但是当我运行程序时,main 方法并没有以正确的Process finished with exit code 0 完成。什么可能导致代码挂起?

更新 - 2015-05-14

根据建议,我将逻辑更改为:

public static void main(String[] args) throws Exception {
  Api api = ...;

  Observable.zip(
      api.getStates(),
      api.getCmsContent(),
      new Func2<List<States>, CmsContent, Boolean>() {
        @Override public Boolean call(List<State> states, CmsContent content) {
          // process data
          return true;
        }
      })
      .subscribeOn(Schedulers.io())
      .toBlocking()
      .first();
}

这看起来像我正在寻找的解决方案。我打算用一段时间看看有没有什么问题。

【问题讨论】:

    标签: retrofit rx-java


    【解决方案1】:

    1) 如果您知道在两条路径上都有一个值,那么它与zip 一样好。

    2) 你想做什么?您将在 Func2 中获得这对值,如果您并不真正关心与 onNext 一起传递的内容,请返回您选择的值。

    3) Schedulers.immediate() 在某种意义上不是真正的调度程序,并且很容易出现同池死锁情况。你真的不需要使用它。如果你想阻塞主线程直到异步工作完成,例如使用toBlocking().first()

    【讨论】:

    • 谢谢,我已根据您的建议更新了我的代码示例。
    【解决方案2】:

    1) 最好不要使用zip()。如果两个(或多个)api 之一返回“较慢”的不同结果/它具有缓存的本质,则结合最新是很好的。

    2) Fun2 有助于合并结果。最好(架构方面)在 onNext() 或 onError() 中处理结果。您可以使用简单的Pair&lt;T,Y&gt; 类将结果从 Func2 传递到 onNext()。

    3) 没有任何问题。所说的结果应该在 onNext() 中处理,而不是在 onComplete 中。根据Retrofit's source code,结果仅(当然正确地)在 onNext() 中传递。

    希望这些帮助。

    【讨论】:

      【解决方案3】:

      我意识到我对此晚了大约一年,但 OP 在 2015-05-14 发布的编辑不符合他最初的要求:

      我想异步执行 2 个网络调用

      1. Observables getStatesgetCmsContent 不会同时执行,除非它们分别订阅不同的线程。这是他帖子中省略的关键点,之前的答案都没有提到。

        Observable.fromCallable(() -> doStuff())
          .subscribeOn(Schedulers.computation());
        

      正如@akarnokd 所说,如果两个流都有单个值,zipcombineLatest 的行为相似。合并函数将阻塞,直到getStatesgetCmsContent 都返回,但就像我上面显示的那样,它们每个都在单独的线程上同时执行。

      1. 另一个解决方案取决于List&lt;States&gt;CmsContent 到达时的合并能力。鉴于他的代码,显然存在某种“数据持有者”(未显示),因为他返回的是Boolean,而不是合并的数据。下面,forEach 并发执行。

        Observable.just(api.getStates(), api.getCmsContent())
        // subscribe on separate thread as shown previously
        .flatMap(this::buildObservable)
        .toBlocking()
        // executes concurrently
        .forEach(item -> {
            // merge into "data holder"          
        });
        

      当然,这段代码存在非强类型的问题,所以这是一个选择。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-10-29
        • 1970-01-01
        • 2017-10-03
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-03-06
        相关资源
        最近更新 更多