【问题标题】:RxJava Flowable.create(), how to respect subscribeOn() threadRxJava Flowable.create(),如何尊重 subscribeOn() 线程
【发布时间】:2019-05-26 05:42:36
【问题描述】:

我正在将自定义库 (dataClient) 回调 api 包装到 RxJava Flowable。 dataClient 使用自己的线程,所以它的回调是在自己的线程上调用的。

在我的 Rx 链中,我尝试使用 .subscribeOn(Schedulers.computation()) 指定计算调度程序。尽管如此,当我在我的 Rx 链上打印线程名称时,我得到了我的 dataClient 线程。

我应该怎么做,让我的 Flowable 使用.subscribeOn() 中指定的线程?

Flowable.create({ emitter ->
    dataClient.setCallback(object : Callback {
        override fun message(message: DataModel) {
            emitter.onNext(vehicle)
        }

        override fun done() {
            emitter.onComplete()
        }
    })
    emitter.setCancellable {
        dataClient.setCallback(null)
    }
}, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.computation())
    .doOnNext { Log.e("DATA", Thread.currentThread().name) }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { data -> Log.d("DATA", "Got data" + data.id)) }

【问题讨论】:

  • 要将onNext 信号移动到另一个线程,请使用observeOnsubscribeOn 用于将订阅副作用移动到另一个线程,并且对拥有自己的发射线程的源几乎没有影响。
  • 是的,observeOn 可以移动到另一个线程,但在这种情况下,subscribeOn 有什么用处?
  • 除非setCallback有特殊的线程需求,比如需要从UI或者一些工作线程中调用,否则没有。

标签: java android kotlin rx-java2


【解决方案1】:

subscribeOn 调度程序确保订阅在相关线程上完成。订阅完全是发生的,它的处理方式与 observeOn 调度程序不同,后者在新线程上调度元素的发射。

Flowable.create({ emitter ->
    // this runs with the computation scheduler
    dataClient.setCallback(object : Callback {
        override fun message(message: DataModel) {
            // this runs on the thread it's called from
            emitter.onNext(vehicle)
        }

        override fun done() {
            // this runs on the thread it's called from
            emitter.onComplete()
        }
    })
    emitter.setCancellable {
        dataClient.setCallback(null)
    }
}, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.computation())
    .doOnNext {
        // this runs on the thread of the onNext call
        Log.e("DATA", Thread.currentThread().name)
    }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe {
        // this runs on the main thread
        data -> Log.d("DATA", "Got data" + data.id))
    }

由于您的订阅代码没有阻塞并且不维护发射线程,因此不需要设置subscribeOn,可以省略。它主要只对同步源有效。

【讨论】:

  • 通常当我获得 Rx api 时,我会根据需要设置subscribeOn。对于这个例子subscribeOn没有任何影响,我担心当其他开发人员使用我的api时,他们也会根据他们的需要设置subscribeOn。这导致 Rx 链在他们期望的另一个线程上运行的问题。我应该如何解决这个问题?
  • 不要使用subscribeOn 来满足这个需求。如果它之前已经在链上调用过,无论如何它都没有任何效果。只会考虑第一次调用。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-05-07
  • 2018-08-19
  • 2019-11-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多