【问题标题】:RxJava Merge Flowable and CompletableRxJava 合并 Flowable 和 Completable
【发布时间】:2019-01-16 08:13:08
【问题描述】:

我有一个Flowable,它不断地发出项目,从不调用onErroronComplete。现在我有一个Completable,我想与这个Flowable 合并,这样当Completable 完成Flowable 调用onComplete。我不能直接更改给我的Flowable 对象。

我遇到的一个问题是我会在Flowable 上使用takeUntil 但是Flowable 可能会在任何时候停止发射项目,我仍然希望Completable 能够调用@987654334 @。

更新: 因为我们可以做Completable.toFlowable(),所以我们可以合并两个Flowables。问题是我仍然无法找到一种方法来完成两者。

【问题讨论】:

    标签: java rx-java2


    【解决方案1】:

    正如你所指出的,你不能改变原来的Flowable 本身,所以它不会发出onComplete。但是,您可以通过执行以下操作(伪代码)使生成的 Flowable 发出它:

    val f: Flowable = ...
    val c: Completable = ...
    val r: Flowable = f.materialize().mergeWith(c.materialize()).dematerialize()
    
    

    【讨论】:

    • 这并没有完成我的功能,因为 mergeWith(Completable) 需要 flowable 也完成
    • 你试过了吗?它不需要 flowable 来完成。
    • 我已经尝试过,mergeWith 的文档说“仅在其他 CompletableSource 也完成时才完成”
    • 这应该无关紧要,因为我们从 Completable 中取消了物化的 onComplete。无论如何,您可以尝试静态 .merge() 方法。
    • .merge() 仍然等待两个源完成
    【解决方案2】:

    这是一个通用的解决方案。正如@Maxim 的回答中所述,merge/mergeWith 没有完成我的功能,因为它需要FlowableCompletable 才能完成。此解决方案还可以正确处理两者。

        Flowable<Integer> f = Flowable.fromArray(1, 2, 3);
        Completable c = Completable.complete();
    
        final PublishSubject<Integer> subject = PublishSubject.create();
        final CompositeDisposable cd = new CompositeDisposable();
        Flowable result = subject
                .doOnSubscribe(__ -> {
                    cd.add(f.subscribe(subject::onNext));
                    cd.add(c.subscribe(subject::onComplete));
                })
                .doOnDispose(cd::dispose)
                .toFlowable(BackpressureStrategy.LATEST);
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多