【问题标题】:RxJava — emit on subscribeOn() threadRxJava — 在 subscribeOn() 线程上发出
【发布时间】:2020-06-05 02:21:57
【问题描述】:

我有以下代码:

Single.create { emitter ->
   // I/O thread here

   ThirdPartySDK.doSomeAction {
       // Main thread here

      emitter.onSuccess(someValue)
   }
}
.flatMap {
  someOtherSingle(it) // Executes on main thread
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({},{})

ThirdPartySDK.doSomeAction 回调在主线程上发布,因此发射器也会在主线程上发射,而不是在订阅线程上(如果我在 flatMap 中进一步进行一些网络交互,链将失败)。

如果我在第一个Single 之后添加observeOn(Schedulers.io()),它会切换到正确的线程,但是有没有办法在正确的线程上发射?我无法修改 ThirdPartySDK 的行为。

【问题讨论】:

    标签: android rx-java rx-java2


    【解决方案1】:

    订阅开启

    subscribeActual lambda 将在给定的调度程序上调用

    观察开启

    将线程切换到给定的调度程序。每个 upstream-onNext 调用都将从 ObserveOn-Scheduler-Thread 调用

    正如你已经说过的,subscribeOn 只会在给定的调度程序线程上调用 subscribeActual 方法调用。这并不意味着下游发射将在同一个线程上。在您的情况下,onSuccess 发射将从不同的线程(例如数据库/ Http-ThreadPool 等)调用。

    onSuccess 将从未知线程(在您的情况下为主线程)调用。下游调用将从主线程调用。因此 flatMap 是从主线程调用的。 flatMap 中主线程上的网络调用可能会失败,因为不允许“阻塞”主线程。

    如何解决这个问题? 只需在 Single#create 之后放置一个 observeOn。主线程调用 onSucess。 observeOn-subscriber 将从主线程中调用。 observeOn-subscriber 将 onSuccess 下游调用(例如 flatMap)重定向到给定的 ObserveOn-Scheduler-Thread。因此给出,flatMap 是从非主循环线程调用的。

    例子:

    @Test
    fun wurst() {
        val thirdPartySDKImpl = ThirdPartySDKImpl()
        Single.create<String> { emitter ->
            thirdPartySDKImpl.doSomeAction {
                emitter.onSuccess(it)
            }
        }
            // .subscribeOn(Schedulers.computation())
            // move emit from unknown thread to computation thread
            .observeOn(Schedulers.computation())
            // Single.just will be subscribe from a computation thread
            .flatMap { Single.just(123) }
            // move onSucess/ onError emit from computation thread to main-thread
            .observeOn(AndroidSchedulers.mainThread())
            // subscribe onNext / onError will be called from the main-android-thread
            .subscribe({}, {})
    }
    
    interface ThirdPartySDK {
        fun doSomeAction(callback: (v: String) -> Unit)
    }
    
    class ThirdPartySDKImpl : ThirdPartySDK {
        override fun doSomeAction(callback: (v: String) -> Unit) {
            // <- impl-detail ->
            callback("whatever")
        }
    
    }
    

    注意:如果 create-lambda 没有阻塞或执行一些 cpu 繁重的工作,则不需要 subscribeOn。如果它只订阅一个回调,该回调将从不同的线程调用,则不需要 subscribeOn。

    但是有什么方法可以在正确的线程上发射吗?

    您不应该在运算符中使用任何并发。你会想,你可以这样做:

        Single.create<String> { emitter ->
            thirdPartySDKImpl.doSomeAction {
                Schedulers.io().scheduleDirect {
                    emitter.onSuccess(it)
                }
            }
        }
    

    但不建议这样做,因为您可能会破坏序列化的 onNext 合约^1。此示例将确保 onSucess 下游调用将在预期线程上发生,但未处理取消/取消订阅,并且可能存在其他陷阱。

    如果您有一个非响应式 API 并且您想强制执行一些线程模型,我建议您包装同步。带有异步的 API,并提供适当的 observeOn/subscribeOn 操作符。以后只使用异步 API。

    interface ThirdPartySDKAsync {
        fun doSomeAction(): Single<String>
    }
    
    class ThirdPartySDKAsyncImpl(private val sdk: ThirdPartySDK, private val scheduler: Scheduler) :
        ThirdPartySDKAsync {
        override fun doSomeAction(): Single<String> {
            return Single.create<String> { emitter ->
                sdk.doSomeAction {
                    emitter.onSuccess(it)
                }
            }.observeOn(scheduler)
        }
    }
    

    延伸阅读:https://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html

    ^1 一次只允许一个线程调用onNext/onSuccess/onError/onComplete

    【讨论】:

      猜你喜欢
      • 2019-05-26
      • 1970-01-01
      • 1970-01-01
      • 2018-05-07
      • 2018-06-28
      • 1970-01-01
      • 1970-01-01
      • 2016-06-03
      • 2018-08-19
      相关资源
      最近更新 更多