【问题标题】:RxJava2 observable take throws UndeliverableExceptionRxJava2 observable take 抛出 UndeliverableException
【发布时间】:2017-09-17 09:56:13
【问题描述】:

据我了解,RxJava2 values.take(1) 创建了另一个 Observable,它只包含原始 Observable 中的一个元素。 绝不能抛出异常,因为它被 take(1) 的效果过滤掉了,因为它发生在第二个。

以下代码sn-p

    Observable<Integer> values = Observable.create(o -> {
        o.onNext(1);
        o.onError(new Exception("Oops"));
    });

    values.take(1)
            .subscribe(
                    System.out::println,
                    e -> System.out.println("Error: " + e.getMessage()),
                    () -> System.out.println("Completed")
            );

输出

1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more

我的问题:

  1. 我理解正确吗?
  2. 导致异常的真正原因。
  3. 如何从消费者那里解决这个问题?

【问题讨论】:

  • 参见 FlowableEmitter.tryOnError 和自 2.1.1 以来的类似方法。

标签: java observable rx-java2 take


【解决方案1】:
  1. 是的,但是因为可观察到的“结束”并不意味着在create(...) 中运行的代码已停止。为了在这种情况下完全安全,您需要使用 o.isDisposed() 来查看 observable 是否已在下游结束。
  2. 例外是因为 RxJava 2 的策略是永远不允许onError 调用丢失。如果 observable 已经终止,它要么被传递到下游,要么作为全局 UndeliverableException 抛出。由 Observable 的创建者来“正确”处理 observable 已结束并发生异常的情况。
  3. 问题是生产者 (Observable) 和消费者 (Subscriber) 在流何时结束时存在分歧。由于在这种情况下生产者的寿命比消费者长,因此问题只能在生产者中解决。

【讨论】:

  • if (!o.isDisposed()) { o.onError(new Exception("Oops")); } 是处理这个问题的正确方法吗?
  • 如果在不再观察到 observable 的情况下丢失该异常是可以接受的,那么可以。如果异常真的应该去某个地方,那么它应该被无条件地调用。
  • 不,这需要在生产者中修复,因为消费者已声明自己已终止。
  • if (!o.isDisposed()) { o.onError(new Exception("Oops"));由于竞争条件(o 可以设置在 if 条件和 onError 调用之间,这不是理论上的,它发生在生产系统中),因此不是处理此问题的正确方法。另请参阅此讨论:github.com/ReactiveX/RxJava/issues/4880
  • @AbdElraoufSabri 一旦引入 tryOnError,我就丢弃了该代码。可以在一些旧的提交中找到它,但是当你有 tryOnError 时,现在有什么意义?
【解决方案2】:

@Kiskae 在之前的评论中正确回答了这种异常发生的原因。

这里是关于这个主题的官方文档的链接:RxJava2-wiki

有时您无法更改此行为,因此有一种方法可以处理此UndeliverableException。下面是如何避免崩溃和不当行为的代码 sn-p:

RxJavaPlugins.setErrorHandler(e -> {
    if (e instanceof UndeliverableException) {
        e = e.getCause();
    }
    if ((e instanceof IOException) || (e instanceof SocketException)) {
        // fine, irrelevant network problem or API that throws on cancellation
        return;
    }
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
        // that's likely a bug in the application
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    if (e instanceof IllegalStateException) {
        // that's a bug in RxJava or in a custom operator
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    Log.warning("Undeliverable exception received, not sure what to do", e);
});

这段代码取自上面的链接。

重要提示。这种方法将全局错误处理程序设置为 RxJava,因此如果您可以摆脱这些异常 - 这将是更好的选择。

【讨论】:

  • 我无法摆脱这些异常,我需要来自这个 RX 的一些数据,但是当这个错误发生时,我无法从 Json 获取我的数据..我现在该怎么办?:(
  • 当我在Observable.create 内部发出一些网络请求时,我的subscriber 在网络调用开始时没有被释放,并且在我收到通话响应时已经被释放。有没有办法在RxJavaPlugins.setErrorHandler 中不获取InterruptedException
【解决方案3】:

在使用 observable.create() 时,只需使用 tryOnError()。 onError() 不保证错误会得到处理。有各种error handling operatorsHERE

【讨论】:

    【解决方案4】:

    科特林

    我在 MainActivity 的 onCreate 方法中调用它

    private fun initRxErrorHandler(){
        RxJavaPlugins.setErrorHandler { throwable ->
            if (throwable is UndeliverableException) {
                throwable.cause?.let {
                    Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), it)
                    return@setErrorHandler
                }
            }
            if (throwable is IOException || throwable is SocketException) {
                // fine, irrelevant network problem or API that throws on cancellation
                return@setErrorHandler
            }
            if (throwable is InterruptedException) {
                // fine, some blocking code was interrupted by a dispose call
                return@setErrorHandler
            }
            if (throwable is NullPointerException || throwable is IllegalArgumentException) {
                // that's likely a bug in the application
                Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
                return@setErrorHandler
            }
            if (throwable is IllegalStateException) {
                // that's a bug in RxJava or in a custom operator
                Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
                return@setErrorHandler
            }
            Log.w("Undeliverable exception", throwable)
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2018-02-27
      • 1970-01-01
      • 1970-01-01
      • 2018-07-22
      • 2019-03-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-15
      相关资源
      最近更新 更多