【问题标题】:How to continue processing after an error happens in RxJava 2?RxJava 2 发生错误后如何继续处理?
【发布时间】:2017-03-15 01:44:29
【问题描述】:

我有一个PublishSubject 和一个Subscriber,我用它们来处理(可能)无限的预处理数据流。问题是某些元素可能包含一些错误。我想忽略它们并继续处理。我该怎么做?我尝试过这样的事情:

    val subject = PublishSubject.create<String>()
    subject.retry().subscribe({
        println("next: $it")
    }, {
        println("error")
    }, {
        println("complete")
    })

    subject.onNext("foo")
    subject.onNext("bar")
    subject.onError(RuntimeException())
    subject.onNext("wom")
    subject.onComplete()

我的问题是这里没有任何错误处理方法可以帮助我:

onErrorResumeNext() — 指示 Observable 发出一个序列 遇到错误时的项目

onErrorReturn( ) — 指示一个 Observable 在遇到错误时发出特定项目

onExceptionResumeNext( ) — 指示 Observable 继续 在遇到异常后发出项目(但不是另一个 各种投掷物)

retry( ) — 如果源 Observable 发出一个 错误,重新订阅它,希望它会在没有的情况下完成 错误

retryWhen( ) — 如果源 Observable 发出错误,则传递 错误到另一个 Observable 以确定是否重新订阅 来源

例如,我尝试了retry(),但它在错误后无限期挂起我的进程。

我也尝试了onErrorResumeNext(),但它没有按预期工作:

    val backupSubject = PublishSubject.create<String>()
    val subject = PublishSubject.create<String>()
    var currentSubject = subject
    subject.onErrorResumeNext(backupSubject).subscribe({
        println("next: $it")
    }, {
        println("error")
        currentSubject = backupSubject
    }, {
        println("complete")
    })

    backupSubject.subscribe({
        println("backup")
    }, {
        println("backup error")
    })

    currentSubject.onNext("foo")
    currentSubject.onNext("bar")
    currentSubject.onError(RuntimeException())
    currentSubject.onNext("wom")
    currentSubject.onComplete()

这只会打印foobar

【问题讨论】:

  • 尝试过 onErrorResumeNext吗?您展示的文档有点奇怪,但这听起来确实像您描述的那样想要它做......我认为断开连接是API假设,如果Observable失败,它可能无法前进到下一个元素;所以写它是为了让你提供一个新的 Observable 来继续这个序列;所以你只需要一个能在错误导致你中断的地方接听,不是吗?
  • onErrorResumeNext 有一个参数,如果我用相同的subject 调用它,我仍然看不到剩余的下一个值和onComplete。如果我添加一个新的Subject,它显然不会有我添加到前一个的订阅者。
  • 好吧,对你来说它是“显而易见的”,它不会有订阅者,对我来说,如果错误处理函数不能解决这个问题,那么错误处理函数到底有什么作用做?它被命名和记录为您的问题的解决方案,所以再次您尝试过吗
  • 是的,我试过了,但它不起作用。 retry() 只是挂起进程。
  • 我没有问你是否尝试过retry()。我问你是否尝试将一个新的 observable 传递给onErrorResumeNext()。但显然,如果它与您的假设相矛盾,您不需要帮助,祝您好运。

标签: java kotlin rx-java2


【解决方案1】:

如果您想在错误后继续处理,这意味着您的错误是一个值,就像您的Strings 一样,应该通过onNext。在这种情况下,为了确保类型安全,您应该使用某种形式的包装器,它可以采用常规值或错误;例如,io.reactivex.Notification&lt;T&gt; 在 RxJava 2 中可用:

PublishSubject<Notification<String>> subject = PublishSubject.create();

subject.subscribe(System.out::println);

subject.onNext(Notification.createOnNext("Hello"));
subject.onNext(Notification.<String>createOnError(new RuntimeException("oops")));
subject.onNext(Notification.createOnNext("World"));

【讨论】:

  • Notification 是否类似于函数式语言中的 Either 类型?
  • 类似Try&lt;Optional&lt;T&gt;&gt; 的东西被压缩成一个类。
猜你喜欢
  • 2017-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-12-19
相关资源
最近更新 更多