【问题标题】:RxJava flow: conditional operators and error handlingRxJava 流程:条件运算符和错误处理
【发布时间】:2015-10-13 17:14:29
【问题描述】:

我是 RxJava 的新手,并尝试将我的头脑包裹在一个更复杂的登录逻辑上,该逻辑具有三个异步方法来处理。对我来说,这是“如果我把这个东西转换成 RxJava,任何事情 (tm) 都是可能的”:)

所以我想做的是:

Call A -> (Process A) -> Call B with results of A -> (Process B) -\
                    \                                              -> Combine and Subscribe
                     \-> Call C with results of A -> (Process C) -/

现在的问题是 Call C 分支应该只在特定条件下执行,否则(Combine and Subscribe 可以从该分支接收 NULL 值,这没关系)。

此外,错误处理也很重要:虽然Call ACall C(如果它运行的话)需要通过onError 将它们的错误传递给最终订阅者Call B 的“成功”是相当可选的,在失败的情况下可以忽略。

这是我到目前为止想出的,它仍然忽略了“C”分支:

 mApi.callA(someArgs)
            // a transition operator to apply thread schedulers
            .compose(applySchedulers())
            // from rxlifecycle-components, unsubscribes automatically 
            // when the activity goes down  
            .compose(bindToLifecycle())
            // possibly other transformations that should work on the (error)
            // states of the first and the following, chained API calls
            .flatMap(response -> processA(response))
            .flatMap(response -> mApi.callB(response.token))
            .flatMap(response -> processB(response))
            // redirects HTTP responses >= 300 to onError()
            .lift(new SequenceOperators.HttpErrorOperator<>())
            // checks for application-specific error payload and redirects that to onError()
            .lift(new SequenceOperators.ApiErrorOperator<>())
            .subscribe(this::allDone this::failure);

我查看了Wiki for conditional operators,但我找不到如何启动Call C 分支的提示。

另外,我不确定我的SequenceOperators 是否可以这样工作,即可以放在链中的所有请求之后,或者我是否需要其中的几个,每个都放在触发一个 flatMap() 运算符之后新Call

谁能指出我正确的方向?

谢谢!

【问题讨论】:

    标签: java reactive-programming rx-java rx-android


    【解决方案1】:

    您应该使用Zip 运算符:) 结果应该如下所示:

    mApi.callA(someArgs)
            // ...
            .flatMap(response -> processA(response))
            .flatMap(response -> {
                  return Observable.zip(
                        callB(response),
                        callC(response),
                        (rA,rB) -> {
                              // or just return a new Pair<>(rA, rB)
                              return combineAPlusB(rA,rB)
                        }
                  )
            })
            // ...
            .subscribe(this::allDone this::failure);
    

    【讨论】:

    • 好的,明白了,现在我不确定如何进行错误处理。我想要一些操作员/组件/我扔在那里的任何东西,它们通常会作用于任何响应(因此是HttpErrorOperatorApiErrorOperator)。另外,您在 G+ 上说过,转换运算符应该放在最后(即compose)-我认为我必须注意这里的“温度”,而不是我不小心将“冷”可观察到“热”的,对吗?
    • 如果 callB 和 callC 都需要,简单的解决方案是对它们都调用 .lift。
    • 否则,如果 callB 和 callC 发出相同的类型,你可以 Observable.merge(callB, callC).lift(..).toList().map(rarb -> combineAplusB(rarb. get(0), rarb.get(1));
    • 好的,我明白了。不幸的是,它们发出不同的(通用)类型,因此合并将不起作用。非常感谢您的帮助!
    猜你喜欢
    • 2016-02-17
    • 2021-05-11
    • 2017-05-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-12-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多