【问题标题】:Rxjava - hot observablesRxjava - 热可观察对象
【发布时间】:2017-04-19 20:17:41
【问题描述】:

当生产者(API 请求)发送事件太快而消费者(API 响应)只获得 API 请求的第二个响应时,我在 RxJava 中遇到了一个奇怪的问题。

假设我有几个请求需要发送到服务器以查询数据库,因为来自服务器的一些查询需要一些时间。所以当我得到响应时,可能是第二个请求先返回,奇怪的是有时我没有得到第一个请求的响应。

调用api的代码:

public void sendRequests() {
     // using RxJava to make server polling.
     startPollingServer();
}

startPollingServer() 方法是使用 RxJava 从服务器轮询数据。

public void startPollingServer() {
     mApiService.getPollingFromServer()
        .retryWhen()
        .repeatWhen()
        .map()
        .subscribeOn()
        .observeOn()
        .subscribe(
           // call onNext
           // call onError
         )
}

但是,当 sendRequests() 调用太快时,onNext、onComplete、onError 都不会被调用。第一次请求没有发生任何事情。 但我确实得到了 CharlesProxy 的第一个响应,这真的很奇怪。

所以我的问题是,RxJava 是否有可能忽略我的第一反应?我需要合并来自 startPollingServer() 的 Observables 吗?

【问题讨论】:

  • 你是在安卓应用还是在测试中使用它?因为如果它是一个测试,它就会失败,因为调用线程将订阅并且只是通过方法 startPollingServer。不建议只订阅 void 方法。如果您不处置订阅,您将获得内存泄漏。您的 startPolling 方法应该返回 Observable 并在一个地方编写。
  • 我在一个安卓应用程序中使用,sendRequests() 是由用户发起的,我正在模拟用户是否非常快地点击按钮或某些 UI。它只是显示了一些代码 sn-p,我有处理订阅订阅的代码,当应用程序被销毁时,所以没有内存泄漏
  • 为什么你确定是 RxJava 问题,服务器确实在每种情况下都返回响应吗?看来您在这里有独立的 Observable 流。

标签: rx-java


【解决方案1】:

根据我 2 年以上的 RxJava 经验,它丢失一些数据的可能性很小。 考虑以下事项:

错误被抑制

使用 retryWhen 和 repeatWhen 可能会导致错误抑制,请尝试添加 'onError()' 运算符来捕获这些:

public void startPollingServer() {
     mApiService.getPollingFromServer()
        .doOnError(throwable -> log.error("Got an error", throwable)) // catch error
        .retryWhen()
        .repeatWhen()
        .map()
        .subscribeOn()
        .observeOn()
        .subscribe(
           // call onNext
           // call onError
         )
}

http://reactivex.io/RxJava/javadoc/rx/Observable.html#doOnError(rx.functions.Action1)

并行度不正确

Observable.just(1,2,3)
.flatMap(i -> doNetworkCall(i))
.first()

此类代码可能会导致并行执行“doNetworkCall(i)”,其中第二个答案可能比第一个答案更快。要进行验证,请改用“concatMap”,它可以保证处理流的顺序:http://reactivex.io/documentation/operators/concat.html

调试

运算符 'doOnNext()'、'doOnSubscribe()'、'doOnCompleted()' 和 'doOnError()' 可以帮助您找到错误

Observable.just(1,2,3)
.flatMap(i -> doNetworkCall(i)
          .doOnSubscribe(() -> log.debug("Launched {}", i))
          .doOnNext(response -> log.debug("Got response {} for {}", response, i))
          .doOnError(throwable -> log.error("For error for " + i, throwable))
          .doOnComplete(() -> log.info("Finished processing of {}", i))
)
.first()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-01-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-09-26
    相关资源
    最近更新 更多