【问题标题】:rxjava switch observable if second observable start emits items如果第二个可观察开始发出项目,rxjava 开关可观察
【发布时间】:2017-04-18 04:27:15
【问题描述】:

我有一些并行执行的 observable,例如 localObservablenetworkObservable。如果networkObservable 开始发射项目(从这个时候开始,我只需要这些项目),那么丢弃localObservable 发射的项目(可能localObservable 还没有开始)。

Observable<Integer> localObservable =
            Observable.defer(() -> Observable.range(1, 10)).subscribeOn(Schedulers.io());
Observable<Integer> networkObservable =
            Observable.defer(() -> Observable.range(11, 20)).subscribeOn(Schedulers.io());

【问题讨论】:

    标签: rx-java


    【解决方案1】:

    你可以这样做:

     Observable<Long> networkObservable =
                Observable.interval(1000, 500, TimeUnit.MILLISECONDS)
                        .subscribeOn(Schedulers.io())
                        .share();
        Observable<Long> localObservable =
                Observable.interval(500, TimeUnit.MILLISECONDS)                       
                        .subscribeOn(Schedulers.io())
                        .takeUntil(networkObservable);
    
        Observable.merge(networkObservable, localObservable)
                .subscribe(System.out::println);
    

    这将输出:

    0 // localObservable 
    1 // localObservable 
    0 // networkObservable from here on
    1
    2
    ...
    

    takeUntil 将使localObservablenetworkObservable 的第一次发射发生时停止并取消订阅,因此合并的Observable 将从localObservable 发射只要networkObservable 没有启动,以及它何时启动确实如此,它将停止从localObservable 发射并切换到仅从networkObservable 发射。

    【讨论】:

    • 如果networkObservable 发出错误,我想继续localObservable ,我该怎么做?
    • 你可以使用mergeDelayError(在这种情况下你会在最后得到onError,在localObservable发出一些东西之后),或者只是用onErrorREsumeNext或类似的方法捕获networkObservable上的所有错误
    【解决方案2】:

    运营商有一个简单的解决方案:AMB

    看看 System.out 的输出。

    文档:http://reactivex.io/documentation/operators/amb.html

    基本上,您同时订阅两个可观察对象,并且任何可观察对象首先发出的内容都会通过。另一个 observable 将被取消订阅。

    @Test
    public void ambTest() throws Exception {
        TestScheduler testScheduler = new TestScheduler();
    
        Observable<Integer> network = Observable.timer(1000, TimeUnit.MILLISECONDS, testScheduler)
                    .concatMap(aLong -> Observable.just(1, 2, 3))
                    .doOnSubscribe(disposable -> System.out.println("connect network"))
                    .doOnDispose(() -> System.out.println("dispose network"));
    
        Observable<Integer> local = Observable.timer(500, TimeUnit.MILLISECONDS, testScheduler)
                    .concatMap(aLong -> Observable.just(4, 5, 6))
                    .doOnSubscribe(disposable -> System.out.println("connect local"))
                    .doOnDispose(() -> System.out.println("dispose local"));
    
        Observable<Integer> integerObservable = Observable.ambArray(network, local);
    
        TestObserver<Integer> test = integerObservable.test();
    
        testScheduler.advanceTimeBy(600, TimeUnit.MILLISECONDS);
    
        test.assertValues(4, 5, 6);
    
        testScheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
    
        test.assertValues(4, 5, 6);
    }
    

    【讨论】:

    • 这取决于需求,如果你确实想要本地 observable 的发射,而网络 observable 还没有发射,那么 amb 只会选择本地 observable 的第一次发射,并且永远不会切换到网络 observable
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多