【问题标题】:RxJava timeout without unsubscribing from sourceRxJava 超时而不从源取消订阅
【发布时间】:2017-10-19 07:09:32
【问题描述】:

我订阅了 hot observable,然后将timeout 运算符应用于它,但是当TimeoutException 被抛出时我不想取消订阅,只产生特殊项目(我知道源最终会发出新项目) .我怎样才能做到这一点?

我正在尝试将超时与onErrorReturn 结合起来,但这又会导致onComplete 调用订阅者。

【问题讨论】:

    标签: operators rx-java rx-java2


    【解决方案1】:

    发布、超时和重试(改编自my older answer):

    Observable<Long> source =
        Observable.just(100L, 200L, 500L, 1000L, 5000L, 5500L, 6000L)
        .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(a -> v));
    
    source.publish(co -> 
        co.timeout(750, TimeUnit.MILLISECONDS, 
             Observable.just(-1L)
             .concatWith(Observable.error(new RuntimeException()))
        )
        .retry()
     ).blockingForEach(System.out::println);
    

    【讨论】:

    • 像魅力一样工作。在我的情况下,源已经很热了,所以不需要使用发布。我缺少的是 concatWith() 和 retry() 的组合,这是我认为的关键。
    猜你喜欢
    • 2019-03-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-18
    相关资源
    最近更新 更多