观察者的普通 onSubscribe 方法可以调用尽可能多的你使用同一个观察者。但是DisposableObserver onSubscibe 方法只能调用一次,这意味着你只能使用这个DisposableObserver 一次。如果您将DisposableObserver 的单个对象传递给两个流,它将抛出异常并快速关闭它们。此逻辑已在此类的onSubscribe() 中实现,因此您无法覆盖它。但如果您需要 onSubscribe() 回调,您可以覆盖 onStart() 方法,这是相同的。
这个类的用法可以如下。
根据文档,DisposableObserver 是:
一个抽象的 Observer,通过实现 Disposable 允许异步取消。
换句话说,这意味着您可以在观察者方法中使用一次性行为。就像在onNext() 中调用dispose()。
Observable.just(1, 2, 3, 4, 5)
.map {
"$it"
}
.subscribe(object : DisposableObserver<String>(){
override fun onComplete() {
}
override fun onNext(t: String) {
println("first item only= $t")
//we can dispose this stream right in the observer methods
dispose()
}
override fun onError(e: Throwable) {
}
})
甚至可以将DisposableObserver 与subscribeWith() 结合起来,获得与普通观察者几乎相同的行为。
val disposableObserver = object : DisposableObserver<String>() {
override fun onComplete() {
}
override fun onNext(t: String) {
println("first item only= $t")
//we can dispose this stream right in the observer methods
dispose()
}
override fun onError(e: Throwable) {
}
}
val disposable: Disposable = Observable.just(1, 2, 3, 4, 5)
.map {
"$it"
}
.subscribeWith(disposableObserver)
disposable.dispose()
RX-Java 中的这个类和许多其他类和运算符都可以简化 RX 的使用,并且可以根据您想要使用库的方式来选择它们中的任何一个。