【发布时间】:2018-06-21 11:26:59
【问题描述】:
我正在尝试创建 Observable,它会在网络连接建立时重试。
我已经创建了主题:
private val retrySubject = PublishSubject.create<Unit>()()
我是这样使用它的:
private fun publishNetworkReconnection() {
compositeDisposable?.add(
connectionHelper.observeConnection()
.subscribe {connected: Boolean
if(connected){
retrySubject.onNext(null)
}
}
)
}
然后我尝试在我的 retryWhen 运算符中使用它:
val disposable =
Flowable.interval(0, UPDATE_INTERVAL, TimeUnit.SECONDS, Schedulers.io())
.onBackpressureDrop()
.flatMapCompletable {
revocationRepository.sync(event.id)
}
.retryWhen { retryHandler -> retryHandler.flatMap({ nothing -> retrySubject.asObservable() }) }
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ }, { Timber.e(it, "Unable to sync blacklist") })
compositeDisposable?.add(disposable)
}
这种情况下如何正确使用?
我在 Android Studio 中收到此错误:
类型不匹配。必需:发布者>!发现:可观察的!
【问题讨论】:
-
你使用 RxJava 1 还是 2?一方面你有
Flowable,另一方面,Subject.asObservable在 RxJava 1 中。你使用这两个版本吗?连这段代码都很难编译 -
@michalbrz 我在关注这个例子android.jlelse.eu/…