【问题标题】:RxJava combining observables without repeating executionRxJava 组合 observables 而无需重复执行
【发布时间】:2016-11-17 01:40:57
【问题描述】:

小故事: 我的情况是,我有 2 个具有单一目的的 Observable:

  • 他们收到一些数据
  • 他们返回修改后的数据
  • 如果无法处理数据则抛出错误

他们各自负责处理不同类型的数据。另外,我想在处理完这两个数据后做一些事情。

我目前最好的实现如下,这些是我的 Observables:

    Single<BlueData> blueObservable = Single.create(singleSubscriber -> {
        if (BlueDataProcessor.isDataValid(myBlueData)) {
            singleSubscriber.onSuccess(BlueDataProcessor.process(myBlueData));
        }
        else {
            singleSubscriber.onError(new BlueDataIsInvalidThrow());
        }
    });

    Single<RedData> redObservable = Single.create(singleSubscriber -> {
        if (RedDataProcessor.isDataValid(myRedData)) {
            singleSubscriber.onSuccess(RedDataProcessor.process(myRedData));
        }
        else {
            singleSubscriber.onError(new RedDataIsInvalidThrowable());
        }
    });

    Single<PurpleData> composedSingle = Single.zip(blueObservable, redObservable,
            (blueData, redData) -> PurpleGenerator.combine(blueData, redData));

我还有以下订阅:

    blueObservable.subscribe(
            result -> {
                saveBlueProcessStats(result);
            },
            throwable -> {
                logError(throwable);
            });

    redObservable.subscribe(
            result -> {
                saveRedProcessStats(result);
            },
            throwable -> {
                logError(throwable);
            });


    composedSingle.subscribe(
            combinedResult -> {
                savePurpleProcessStats(combinedResult)
            },
            throwable -> {
                logError(throwable);
            });

我的问题: 蓝色和红色数据被处理了两次,因为两个订阅都再次运行,我订阅了使用 Observable.zip() 创建的组合 observable。

如何在不运行两次操作的情况下获得这种行为?

【问题讨论】:

    标签: java rx-java rx-android rx-java2


    【解决方案1】:

    这对于 1.x 中的 Single 是不可能的,因为没有 ConnectableSingleSingle.publish 的概念。可以通过 2.x 和 RxJava2Extensions 库实现效果:

    SingleSubject<RedType> red = SingleSubject.create();
    SingleSubject<BlueType> blue = SingleSubject.create();
    
    // subscribe interested parties
    red.subscribe(...);
    blue.subscribe(...);
    
    Single.zip(red, blue, (r, b) -> ...).subscribe(...);
    
    // connect()
    blueObservable.subscribe(blue);
    redObservable.subscribe(red);
    

    【讨论】:

    • 一旦我可以将此项目升级到 rxjava2,我将实施此解决方案。但是,我对这个解决方案很满意,可以将其标记为正确答案。谢谢!
    猜你喜欢
    • 2017-09-07
    • 1970-01-01
    • 2015-10-01
    • 1970-01-01
    • 2022-01-04
    • 2014-12-02
    • 2019-08-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多