【发布时间】:2015-02-08 05:22:55
【问题描述】:
如果我有 Observalbe:
List<Integer> ints = Lists.newArrayList(1, 2, 0, 3, 4);
Observable<Integer> o1 = Observable.from(ints);
我想生成另一个 observable ,除以 12 :
Observable<Integer> o2 = o1.map(i -> 12/i);
o2.subscribe(
v -> logger.info ("Subscriber value {}", v) ,
t -> logger.error("Subscriber onError {} : {}", t.getClass() , t.getMessage())
);
很明显会报错,遇到'0'就停止了:
RxTest - Subscriber value 12
RxTest - Subscriber value 6
RxTest - Subscriber onError class java.lang.ArithmeticException : / by zero
但是如果我想让 Observer(o2) 跳过异常呢?
我查看了 RxJava 的关于 error handling 的文档,没有办法跳过错误。 onErrorResumeNext() 和 onExceptionResumeNext() 需要 backup/fallback Observable ,这不是我想要的。 onErrorReturn需要指定返回值。
所有三种错误处理方法都不能恢复原来的观察者。例如:
Observable<Integer> o2 = o1.map(i -> 12/i)
.onErrorReturn(t -> 0);
打印出来:
RxTest - Subscriber value 12
RxTest - Subscriber value 6
RxTest - Subscriber value 0
不打印其余的 12/3 和 12/4
map 函数中的唯一解决方案似乎是中继:
Observable<Integer> o2 = o1.map(i -> {
try {
return Optional.of(12/i);
} catch (ArithmeticException e) {
return Optional.empty();
}
}).filter(Optional::isPresent)
.map(o -> (Integer) o.get());
它可以工作,但它很麻烦。我想知道在操作Observable时是否有任何方法可以轻松跳过任何RuntimeException(例如map)
以上是关于在Observable 中跳过异常。以下是关于Subscriber中的跳过异常:
情况也一样:
List<Integer> ints = Lists.newArrayList(1, 2, 0 , 3 , 4);
Observable<Integer> o1 = Observable.from(ints);
o1.subscribe(
i -> logger.info("12 / {} = {}", i, 12 / i),
t -> logger.error("{} : {}", t.getClass() , t.getMessage()),
() -> logger.info("onCompleted")
);
打印出来:
RxTest - 12 / 1 = 12
RxTest - 12 / 2 = 6
RxTest - class java.lang.ArithmeticException : / by zero
当onNext 发生异常时,它会触发onError,并且不会响应来自Observable 的任何数据。如果我希望订阅者吞下异常,我必须尝试在onNext() 中捕获ArithmeticException。有没有更清洁的解决方案?
似乎当Subscriber 在onNext() 中遇到无法在(onNext) 内处理的错误时,它应该停止,对吧?这是一个好的设计吗?
【问题讨论】:
-
在我看来,
onNext()相当于Iterator.next()- 当迭代集合出现问题时 - 抛出异常并且迭代器退出(它不会“恢复" 迭代) - 例如见ConcurrentModificationException。这与我们在这里的行为相同。也就是说,@benjchristensen 或许能够更清楚地了解这个主题 -
Optional映射可能是我最喜欢的处理方式。很清楚,过滤器很好地处理了事件。这是一个很好的封装方式。我会在 scala 中使用Try类型,然后对其进行过滤。
标签: java reactive-programming rx-java