【问题标题】:RxJava Observable and Subscriber for skipping exception?RxJava Observable 和订阅者跳过异常?
【发布时间】:2015-02-08 05:22:55
【问题描述】:

如果我有 Observalbe:

List<Integer> ints = Lists.newArrayList(1, 2, 0, 3, 4);
Observable<Integer> o1 = Observable.from(ints);

我想生成另一个 observable ,除以 12 :

Observable<Integer> o2 = o1.map(i -> 12/i);
o2.subscribe(
  v -> logger.info ("Subscriber value {}", v) ,
  t -> logger.error("Subscriber onError {} : {}", t.getClass() , t.getMessage())
);

很明显会报错,遇到'0'就停止了:

RxTest - Subscriber value 12
RxTest - Subscriber value 6
RxTest - Subscriber onError class java.lang.ArithmeticException : / by zero

但是如果我想让 Observer(o2) 跳过异常呢?

我查看了 RxJava 的关于 error handling 的文档,没有办法跳过错误。 onErrorResumeNext()onExceptionResumeNext() 需要 backup/fallback Observable ,这不是我想要的。 onErrorReturn需要指定返回值。

所有三种错误处理方法都不能恢复原来的观察者。例如:

Observable<Integer> o2 = o1.map(i -> 12/i)
      .onErrorReturn(t -> 0);

打印出来:

RxTest - Subscriber value 12
RxTest - Subscriber value 6
RxTest - Subscriber value 0

不打印其余的 12/3 和 12/4

map 函数中的唯一解决方案似乎是中继:

Observable<Integer> o2 = o1.map(i -> {
  try {
    return Optional.of(12/i);
  } catch (ArithmeticException e) {
    return Optional.empty();
  }
}).filter(Optional::isPresent)
  .map(o -> (Integer) o.get());

它可以工作,但它很麻烦。我想知道在操作Observable时是否有任何方法可以轻松跳过任何RuntimeException(例如map

以上是关于在Observable 中跳过异常。以下是关于Subscriber中的跳过异常:

情况也一样:

List<Integer> ints = Lists.newArrayList(1, 2, 0 , 3 , 4);
Observable<Integer> o1 = Observable.from(ints);
o1.subscribe(
  i -> logger.info("12 / {} = {}", i, 12 / i),
  t -> logger.error("{} : {}", t.getClass() , t.getMessage()),
  () -> logger.info("onCompleted")
);

打印出来:

RxTest - 12 / 1 = 12
RxTest - 12 / 2 = 6
RxTest - class java.lang.ArithmeticException : / by zero

onNext 发生异常时,它会触发onError,并且不会响应来自Observable 的任何数据。如果我希望订阅者吞下异常,我必须尝试在onNext() 中捕获ArithmeticException。有没有更清洁的解决方案?

似乎当SubscriberonNext() 中遇到无法在(onNext) 内处理的错误时,它应该停止,对吧?这是一个好的设计吗?

【问题讨论】:

  • 在我看来,onNext() 相当于Iterator.next() - 当迭代集合出现问题时 - 抛出异常并且迭代器退出(它不会“恢复" 迭代) - 例如见ConcurrentModificationException。这与我们在这里的行为相同。也就是说,@benjchristensen 或许能够更清楚地了解这个主题
  • Optional 映射可能是我最喜欢的处理方式。很清楚,过滤器很好地处理了事件。这是一个很好的封装方式。我会在 scala 中使用 Try 类型,然后对其进行过滤。

标签: java reactive-programming rx-java


【解决方案1】:
Observable.just(1, 2, 3, 0, 4)
            .flatMap(i -> Observable.defer(() -> Observable.just(12 / i))
                    .onErrorResumeNext(Observable.just(0)));

这是一种方法,但请记住,RxJava 假定错误是真正无法预料的事情(您可以期望值为 0)。另一方面,如果您想忽略除以 0 异常,您可能应该在除法之前过滤/映射您的值。

【讨论】:

  • 我在这里唯一要更改的是将Observable.from(1, 2, 3, 0, 4) 更改为Observable.just(1, 2, 3, 0, 4),因为在1.0.0 中它不会与from 一起编译。
【解决方案2】:

试试这个方法。

Observable<Integer> o2 = o1.map(i -> 12/i).onErrorFlatMap(e -> Observable.empty());

在上面的代码中,错误事件被一个空流替换,原始流正在恢复。因此会跳过错误。

【讨论】:

  • 哦,抱歉我没有注意到版本。但是可以使用materializedematerialize 实现类似onErrorFlatMap 的组合器,虽然有点困难。
  • 我也很好奇,为什么这种方法在1.0中消失了。
【解决方案3】:
import rx.lang.scala._

import scala.concurrent.duration.DurationDouble
import scala.util.{Failure, Success, Try}

val o1 = List(1, 2, 3, 0, 4) toObservable
val o2 = List(1, 2, 3, 4, 5) toObservable

val o3 = o1 zip o2 map {case (i1, i2) => i2 / i1 } // this will trigger a division by 0 error

val o4 = o3 lift {  
      subscriber: Subscriber[Try[Int]] => new Subscriber[Int](subscriber) {
        override def onNext(v: Int) {
          if (!subscriber.isUnsubscribed)
            subscriber.onNext(Success(v))
        }

        override def onError(e: Throwable) {
          if (!subscriber.isUnsubscribed)
            subscriber.onNext(Failure(e))
        }

        override def onCompleted() {
          if (!subscriber.isUnsubscribed)
            subscriber.onCompleted()
        }
      }
    }

o4 subscribe(println(_))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-03-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多