【发布时间】:2017-03-15 01:44:29
【问题描述】:
我有一个PublishSubject 和一个Subscriber,我用它们来处理(可能)无限的预处理数据流。问题是某些元素可能包含一些错误。我想忽略它们并继续处理。我该怎么做?我尝试过这样的事情:
val subject = PublishSubject.create<String>()
subject.retry().subscribe({
println("next: $it")
}, {
println("error")
}, {
println("complete")
})
subject.onNext("foo")
subject.onNext("bar")
subject.onError(RuntimeException())
subject.onNext("wom")
subject.onComplete()
我的问题是这里没有任何错误处理方法可以帮助我:
onErrorResumeNext()— 指示 Observable 发出一个序列 遇到错误时的项目
onErrorReturn( )— 指示一个 Observable 在遇到错误时发出特定项目
onExceptionResumeNext( )— 指示 Observable 继续 在遇到异常后发出项目(但不是另一个 各种投掷物)
retry( )— 如果源 Observable 发出一个 错误,重新订阅它,希望它会在没有的情况下完成 错误
retryWhen( )— 如果源 Observable 发出错误,则传递 错误到另一个 Observable 以确定是否重新订阅 来源
例如,我尝试了retry(),但它在错误后无限期挂起我的进程。
我也尝试了onErrorResumeNext(),但它没有按预期工作:
val backupSubject = PublishSubject.create<String>()
val subject = PublishSubject.create<String>()
var currentSubject = subject
subject.onErrorResumeNext(backupSubject).subscribe({
println("next: $it")
}, {
println("error")
currentSubject = backupSubject
}, {
println("complete")
})
backupSubject.subscribe({
println("backup")
}, {
println("backup error")
})
currentSubject.onNext("foo")
currentSubject.onNext("bar")
currentSubject.onError(RuntimeException())
currentSubject.onNext("wom")
currentSubject.onComplete()
这只会打印foo 和bar。
【问题讨论】:
-
您尝试过
onErrorResumeNext吗?您展示的文档有点奇怪,但这听起来确实像您描述的那样想要它做......我认为断开连接是API假设,如果Observable失败,它可能无法前进到下一个元素;所以写它是为了让你提供一个新的 Observable 来继续这个序列;所以你只需要一个能在错误导致你中断的地方接听,不是吗? -
onErrorResumeNext有一个参数,如果我用相同的subject调用它,我仍然看不到剩余的下一个值和onComplete。如果我添加一个新的Subject,它显然不会有我添加到前一个的订阅者。 -
好吧,对你来说它是“显而易见的”,它不会有订阅者,对我来说,如果错误处理函数不能解决这个问题,那么错误处理函数到底有什么作用做?它被命名和记录为您的问题的解决方案,所以再次您尝试过吗?
-
是的,我试过了,但它不起作用。
retry()只是挂起进程。 -
我没有问你是否尝试过
retry()。我问你是否尝试将一个新的 observable 传递给onErrorResumeNext()。但显然,如果它与您的假设相矛盾,您不需要帮助,祝您好运。