【问题标题】:RxJava: Conditionally catch error and stop propagationRxJava:有条件地捕获错误并停止传播
【发布时间】:2016-02-25 04:32:23
【问题描述】:

我将 Retrofit 与 RxJava Observables 和 lambda 表达式一起使用。我是 RxJava 新手,不知道如何执行以下操作:

Observable<ResponseBody> res = api.getXyz();
res.subscribe(response -> {
    // I don't need the response here
}, error -> {
    // I might be able to handle an error here. If so, it shall not go to the second error handler.
});
res.subscribe(response -> {
    // This is where I want to process the response
}, error -> {
    // This error handler shall only be invoked if the first error handler was not able to handle the error.
});

我查看了error handling operators,但我不明白他们如何帮助我处理我的用例。

【问题讨论】:

  • 首先,您现在的代码将执行网络请求两次,每个订阅者一次 - 原因是 Retrofit 返回“冷” Observables,它只会在某些订阅者时执行订阅他们。除此之外,您能否解释一下您想要实现的目标?这两个订阅是否属于不同的类别?在每种情况下“处理”错误是什么意思?而且,也许onErrorResumeNext 是您想要的 - 它可以检查错误是否可以在那里“处理”,然后传递错误或执行其他操作...
  • 是的,两个订阅属于不同的类。我尝试做这样的事情:为 api-request(-response) 创建 observable。然后将该可观察对象传递给附加一些通用错误处理程序并返回(或新的)可观察对象的函数。最后,订阅以获取 api 响应或处理通用错误处理程序未处理的任何错误。

标签: java error-handling rx-java observable retrofit2


【解决方案1】:

方法一:保留两个Subscribers 但cacheObservable

保持一切不变,但将第一行改为:

Observable<ResponseBody> res = api.getXyz().cache();

cache 将确保请求只发送一次,但Subscribers 仍然会收到所有相同的事件。

这样,您是否以及如何处理第一个 Subscriber 中的错误不会影响第二个 Subscriber 看到的内容。

方法 2: 使用onErrorResumeNext 捕获一些错误,但转发所有其他错误。

onErrorResumeNext 添加到您的Observable 以生成类似这样的内容(在“内部”对象中):

Observable observable = Observable.error(new IllegalStateException())
.onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
    @Override
    public Observable<?> call(Throwable throwable) {
        if (throwable instanceof NumberFormatException) {
            System.out.println("NFE - handled");
            return Observable.empty();
        } else {
            System.out.println("Some other exception - panic!");
            return Observable.error(throwable);
        }
    }
});

并且只订阅一次(在“外部”对象中):

observable.subscribe(new Subscriber() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError");
        e.printStackTrace();
    }

    @Override
    public void onNext(Object o) {
        System.out.println(String.format("onNext: %s", String.valueOf(o)));
    }    

});

这样,只有在onErrorResumeNext 中无法处理错误时才会转发该错误 - 如果可以,Subscriber 将只得到对onCompleted 的调用,而没有其他任何内容。

不过,onErrorResumeNext 的副作用让我有点不舒服。 :-)

编辑:哦,如果你想更加严格,你可以使用方法3:将每个案例包装在一个新对象中。

public abstract class ResultOrError<T> {
}

public final class Result<T> extends ResultOrError<T> {
    public final T result;

    public Result(T result) {
        this.result = result;
    }
}

public final class HandledError<T> extends ResultOrError<T> {
    public final Throwable throwable;

    public Result(Throwable throwable) {
        this.throwable = throwable;
    }
}

public final class UnhandledError<T> extends ResultOrError<T> {
    public final Throwable throwable;

    public Result(Throwable throwable) {
        this.throwable = throwable;
    }
}

然后:

  • 将正确的结果包装在Result 中(使用map
  • HandledError
  • 中包装可处理的错误
  • UnhandledError 中无法处理的错误(使用带有if 子句的onErrorResumeNext
  • 处理HandledErrors(使用doOnError
  • 有一个Subscriber&lt;ResultOrError&lt;ResponseBody&gt;&gt; - 它会收到所有三种类型的通知 (onNext),但会忽略 HandledErrors 并处理其他两种类型。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-05-15
    • 2021-05-06
    • 1970-01-01
    • 2018-07-18
    • 2017-02-16
    • 1970-01-01
    • 1970-01-01
    • 2014-03-10
    相关资源
    最近更新 更多