【发布时间】:2016-05-24 18:57:18
【问题描述】:
我知道这违反了很多 Rx 规则,但我真的很喜欢 RxJava-JDBC,我的队友也一样。关系数据库是我们工作的核心,Rx 也是如此。
但是在某些情况下,我们不想发出 Observable<ResultSet>,而是希望只使用基于拉取的 Java 8 Stream<ResultSet> 或 Kotlin Sequence<ResultSet>。但是我们非常习惯 RxJava-JDBC 库,它只返回一个Observable<ResultSet>。
因此,我想知道是否有一种方法可以使用扩展功能将Observable<ResultSet> 转换为Sequence<ResultSet>,而不进行任何中间收集或toBlocking() 调用。以下是我目前所拥有的所有内容,但我现在正在尝试连接基于推送和拉取的系统,我也无法缓冲,因为 ResultSet 在每个 onNext() 调用中都是有状态的。这是不可能完成的任务吗?
import rx.Observable
import rx.Subscriber
import java.sql.ResultSet
fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() {
private var isComplete = false
override fun onCompleted() {
isComplete = true
}
override fun onError(e: Throwable?) {
throw UnsupportedOperationException()
}
override fun onNext(rs: ResultSet?) {
throw UnsupportedOperationException()
}
override fun hasNext(): Boolean {
throw UnsupportedOperationException()
}
override fun next(): ResultSet {
throw UnsupportedOperationException()
}
}.asSequence()
【问题讨论】:
-
我使用严格的基于拉的实现github.com/thomasnield/kdbc/blob/master/src/main/kotlin/kdbc/… 做了类似的事情
-
一个可观察对象实际上可以在另一个线程中工作,所以我认为这并不容易(如果可能的话)。为什么要避免
toBlocking()?据我了解,这将是实现您所需要的安全且简单的方法。 -
我认为这不适用于有状态的 ResultSet...
标签: stream sequence rx-java kotlin rx-kotlin