【问题标题】:RxJava- Turn Observable into Iterator, Stream, or SequenceRxJava - 将 Observable 转换为 Iterator、Stream 或 Sequence
【发布时间】:2016-05-24 18:57:18
【问题描述】:

我知道这违反了很多 Rx 规则,但我真的很喜欢 RxJava-JDBC,我的队友也一样。关系数据库是我们工作的核心,Rx 也是如此。

但是在某些情况下,我们不想发出 Observable<ResultSet>,而是希望只使用基于拉取的 Java 8 Stream<ResultSet> 或 Kotlin Sequence<ResultSet>。但是我们非常习惯 RxJava-JDBC 库,它只返回一个Observable<ResultSet>

因此,我想知道是否有一种方法可以使用扩展功能将Observable<ResultSet> 转换为Sequence<ResultSet>,而不进行任何中间收集或toBlocking() 调用。以下是我目前所拥有的所有内容,但我现在正在尝试连接基于推送和拉取的系统,我也无法缓冲,因为 ResultSet 在每个 onNext() 调用中都是有状态的。这是不可能完成的任务吗?

import rx.Observable
import rx.Subscriber
import java.sql.ResultSet

fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() {

    private var isComplete = false

    override fun onCompleted() {
        isComplete = true
    }

    override fun onError(e: Throwable?) {
        throw UnsupportedOperationException()
    }

    override fun onNext(rs: ResultSet?) {
        throw UnsupportedOperationException()
    }


    override fun hasNext(): Boolean {
        throw UnsupportedOperationException()
    }

    override fun next(): ResultSet {
        throw UnsupportedOperationException()
    }

}.asSequence()

【问题讨论】:

  • 我使用严格的基于拉的实现github.com/thomasnield/kdbc/blob/master/src/main/kotlin/kdbc/… 做了类似的事情
  • 一个可观察对象实际上可以在另一个线程中工作,所以我认为这并不容易(如果可能的话)。为什么要避免toBlocking()?据我了解,这将是实现您所需要的安全且简单的方法。
  • 我认为这不适用于有状态的 ResultSet...

标签: stream sequence rx-java kotlin rx-kotlin


【解决方案1】:

我不确定这是实现您想要的最简单的方法,但您可以尝试此代码。它通过创建阻塞队列并将Observable 中的所有事件发布到此队列,将Observable 转换为IteratorIterable 从队列中拉出事件,如果没有则阻塞。然后根据接收到的当前事件修改自己的状态。

class ObservableIterator<T>(
    observable: Observable<T>,
    scheduler: Scheduler
) : Iterator<T>, Closeable {

  private val queue = LinkedBlockingQueue<Notification<T>>()
  private var cached: Notification<T>? = null
  private var completed: Boolean = false

  private val subscription =
      observable
          .materialize()
          .subscribeOn(scheduler)
          .subscribe({ queue.put(it) })

  override fun hasNext(): Boolean {
    cacheNext()
    return !completed
  }

  override fun next(): T {
    cacheNext()
    val notification = cached ?: throw NoSuchElementException()
    check(notification.isOnNext)
    cached = null
    return notification.value
  }

  private fun cacheNext() {
    if (completed) {
      return
    }

    if (cached == null) {
      queue.take().let { notification ->
        if (notification.isOnError) {
          completed = true
          throw RuntimeException(notification.throwable)
        } else if (notification.isOnCompleted) {
          completed = true
        } else {
          cached = notification
        }
      }
    }
  }

  override fun close() {
    subscription.unsubscribe()
    completed = true
    cached = null
  }
}

【讨论】:

  • 优秀。今晚回家后我会玩这个。
  • 在我的手机上研究这个太棒了。唯一可能需要关注的是,一次只有一个 ResultSet 应该在队列中,并且它应该阻塞 get 或 put 端以确保这一点。如果队列不能被限制,我总是可以创建自己的同步器......
  • @ThomasN。您可以将最大容量设置为 queue,并将其传递给 LinkedBlockingQueue 构造函数。
  • 这种方式与Sequence { observable.toBlocking().getIterator() }有何不同?
  • 您可以确保 toBlocking().getIterator() 不会等待 observable 完成此 sn-p:Observable.interval(500, TimeUnit.MILLISECONDS) .take(10) .doOnCompleted { println("Done!") } .toBlocking().iterator.asSequence() .forEach { println(it) }
【解决方案2】:

您可以使用以下辅助函数:

fun <T> Observable<T>.asSequence() = Sequence { toBlocking().getIterator() }

当为迭代器调用返回的序列时,将订阅 observable。

如果一个 observable 在它订阅的同一线程上发出元素(例如Observable.just),它将在有机会返回之前填充迭代器的缓冲区。 在这种情况下,您可能需要通过调用 subscribeOn 来直接订阅不同的线程:

observable.subscribeOn(scheduler).asSequence()

然而,虽然toBlocking().getIterator() 不会缓冲所有结果,但如果迭代器没有及时使用它们,它可以缓冲其中一些结果。如果下一个ResultSet 到达时ResultSet 以某种方式过期,这可能是个问题。

【讨论】:

  • 好吧,也许这就是为什么我得到了错误的印象,因为我需要使用 subscribeOn()。明天我会更仔细地看一下这个。我很高兴为此目的内置了一个运算符。
  • @ThomasN。严格顺序是什么意思?如果在从迭代器获取结果集并进行处理之前,您不能从订阅者的onNext 返回,恐怕您需要一些手动同步。
  • 是的,这将是 ResultSet 过期的问题。我相信每次都会发出相同的 ResultSet 实例。每次发射时,它的内部状态都会发生变化。如果它的状态变化快于每个状态的消耗速度,那就是个大问题。
  • 是的,我认为手动同步是必要的,所以我不得不使用@Michael 的类似实现。但是谢谢你,这个解决方案适用于我遇到的除 ResultSet 之外的所有其他情况:)
  • 在此处获得了为此目的创建同步器的一些帮助。 stackoverflow.com/questions/37446814/…
猜你喜欢
  • 2020-12-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-10-14
  • 2015-07-25
  • 2019-11-08
相关资源
最近更新 更多