【问题标题】:RxJava return error as onNext and continue streamRxJava 返回错误为 onNext 并继续流
【发布时间】:2020-09-25 18:06:41
【问题描述】:

所以我尝试使用onErrorReturn 返回我想要的结果,但它会在之后完成流,我如何捕获错误返回为 Next 并继续流?

使用下面的代码,当出现错误时它不会到达retryWhen,如果我翻转它,它不会在出现错误时重新订阅retryWhen

fun process(): Observable<State> {
    return publishSubject
        .flatMap { intent ->
            actAsRepo(intent) // Might return error
                .map { State(data = it, error = null) }
        }
        .onErrorReturn { State(data = "", error = it) } // catch the error
        .retryWhen { errorObs -> 
            errorObs.flatMap {
                Observable.just(State.defaultState()) // continue subscribing
            }
        }
}

private fun actAsRepo(string: String): Observable<String> {
    if (string.contains('A')) {
        throw IllegalArgumentException("Contains A")
    } else {
        return Observable.just("Wrapped from repo: $string")
    }
}

订阅者将是

viewModel.process().subscribe(this::render)

【问题讨论】:

    标签: android kotlin error-handling rx-java rx-java2


    【解决方案1】:

    onError 是一个终端操作符。如果发生 onError,它将在操作员之间传递。你可以使用一个 onError 操作符来捕获 onError 并提供一个回退。

    在您的示例中,onError 发生在 flatMap 的内部流中。 onError 将向下游传播到 onErrorReturn 操作符。如果您查看实现,您会看到将调用 onErrorReturn lambda,结果将在 onComplete 之后使用 onNext 推送到下游

        @Override
        public void onError(Throwable t) {
            T v;
            try {
                v = valueSupplier.apply(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                downstream.onError(new CompositeException(t, e));
                return;
            }
    
            if (v == null) {
                NullPointerException e = new NullPointerException("The supplied value is null");
                e.initCause(t);
                downstream.onError(e);
                return;
            }
    
            downstream.onNext(v); // <--------
            downstream.onComplete(); // <--------
        }
    

    您的解决方案的结果是什么?

    您的流完成是因为:#retryWhen JavaDoc

    如果操作符的上游是异步的,则立即发出 onNext 后跟 onComplete 可能会导致序列立即完成。类似地,如果这个内部 {@code ObservableSource} 在上游活动时发出 {@code onError} 或 {@code onComplete} 信号,则序列会立即以相同的信号终止。

    你应该做什么:

    将 onErrorReturn 放在 flatMap 中的 map opreator 后面。使用此排序,当内部平面地图流 onErrors 时,您的流将无法完成。

    这是为什么?

    当外部(来源:publishSubject)和内部流(订阅)都完成时,flatMap 运算符完成。在这种情况下,外部流 (publishSubject) 发出 onNext,内部流将在通过 onNext 发送 { State(data = "", error = it) } 后完成。因此,流将保持打开状态。

    interface ApiCall {
        fun call(s: String): Observable<String>
    }
    
    class ApiCallImpl : ApiCall {
        override fun call(s: String): Observable<String> {
            // important: warp call into observable, that the exception is caught and emitted as onError downstream
            return Observable.fromCallable {
                if (s.contains('A')) {
                    throw IllegalArgumentException("Contains A")
                } else {
                    s
                }
            }
        }
    }
    
    data class State(val data: String, val err: Throwable? = null)
    

    apiCallImpl.call 将返回一个惰性 observable,这将在订阅时引发错误,而不是在 observable 组装时。

    // no need for retryWhen here, except you want to catch onComplete from the publishSubject, but once the publishSubject completes no re-subscription will help you, because the publish-subject is terminated and onNext invocations will not be accepted anymore (see implementation). 
    fun process(): Observable<State> {
        return publishSubject
            .flatMap { intent ->
                apiCallImpl.call(intent) // Might return error
                    .map { State(data = it, err = null) }
                    .onErrorReturn { State("", err = it) }
            }
    }
    

    测试

    lateinit var publishSubject: PublishSubject<String>
    lateinit var apiCallImpl: ApiCallImpl
    
    @Before
    fun init() {
        publishSubject = PublishSubject.create()
        apiCallImpl = ApiCallImpl()
    }
    
    @Test
    fun myTest() {
        val test = process().test()
    
        publishSubject.onNext("test")
        publishSubject.onNext("A")
        publishSubject.onNext("test2")
    
        test.assertNotComplete()
            .assertNoErrors()
            .assertValueCount(3)
            .assertValueAt(0) {
                assertThat(it).isEqualTo(State("test", null))
                true
            }
            .assertValueAt(1) {
                assertThat(it.data).isEmpty()
                assertThat(it.err).isExactlyInstanceOf(IllegalArgumentException::class.java)
                true
            }
            .assertValueAt(2) {
                assertThat(it).isEqualTo(State("test2", null))
                true
            }
    }
    

    另类

    此替代方案的行为与第一个解决方案略有不同。 flatMap-Operator 采用布尔值 (delayError),这将导致吞下 onError 消息,直到源完成。当源代码完成时,将发出错误。

    你可以使用delayError true,当异常没有用并且在出现时不能记录

    进程

    fun process(): Observable<State> {
        return publishSubject
            .flatMap({ intent ->
                apiCallImpl.call(intent)
                    .map { State(data = it, err = null) }
            }, true)
    }
    

    测试

    只发出两个值。错误不会转换为备用值。

    @Test
    fun myTest() {
        val test = process().test()
    
        publishSubject.onNext("test")
        publishSubject.onNext("A")
        publishSubject.onNext("test2")
    
        test.assertNotComplete()
            .assertNoErrors()
            .assertValueAt(0) {
                assertThat(it).isEqualTo(State("test", null))
                true
            }
            .assertValueAt(1) {
                assertThat(it).isEqualTo(State("test2", null))
                true
            }
            .assertValueCount(2)
    }
    

    注意:我认为你想在这种情况下使用 switchMap,而不是 flatMap。

    【讨论】:

    • 感谢您的回复,但是当我在地图后添加onErrorReturn 时,我得到.OnErrorNotImplementedException 应用程序退出,因为我没有在订阅者中实现错误处理程序,我假设所有流都会成功并处理而是在State 中,或者这是在 Rx 中的错误方式?
    • 请把 StackTrace 放到 Github gist 上好吗?我认为问题出在急切的 actAsRepo() 方法上,它会在可观察的组装时间上引发异常。当 observable 完全组装时,onError*-handler 只能处理 onError。如果 flatMap 发生异常,它会捕获异常并将其传播到下游。
    • here is the gist,看起来错误来自 actAsRepo() throwing IllegalArgumentException
    • @DanLee,正如我所怀疑的,调用 flatMap lambda 时会引发异常。我通过测试更新了我的答案。如果您理解正确,请查看。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-12-17
    相关资源
    最近更新 更多