【问题标题】:RxJava Observable.create wrapping observable subscriptionsRxJava Observable.create 包装可观察订阅
【发布时间】:2017-11-14 03:31:19
【问题描述】:

我使用 Observable.create 以便在某些数据可用时通知订阅者。我有点不确定在我的 create 方法中订阅 observables。这些嵌套订阅会给我带来任何问题吗?我对使用 Observable.create 创建可观察对象并不完全熟悉,所以我想确保我没有做任何不寻常的事情或滥用它。提前谢谢!

abstract class NetworkResource<ApiType, DbType> constructor(private val schedulerProvider: SchedulerProvider) {

    abstract fun fetchFromApi(): Single<ApiType>
    abstract fun fetchFromDb(): Observable<Optional<DbType>>
    abstract fun saveToDb(apiType: ApiType?)
    abstract fun shouldFetchFromApi(cache: DbType?): Boolean

    fun fetch(): Observable<Optional<DbType>>  {
        return Observable.create<Optional<DbType>> {
            val subscriber = it

            fetchFromDb()
                    .subscribe({
                        subscriber.onNext(it)

                        if(shouldFetchFromApi(it.get())) {
                            fetchFromApi()
                                    .observeOn(schedulerProvider.io())
                                    .map {
                                        saveToDb(it)
                                        it
                                    }
                                    .observeOn(schedulerProvider.ui())
                                    .flatMapObservable {
                                        fetchFromDb()
                                    }
                                    .subscribe({
                                        subscriber.onNext(it)
                                        subscriber.onComplete()
                                    })
                        }
                        else {
                            subscriber.onComplete()
                        }
                    })

        }
    }
}

【问题讨论】:

    标签: android rx-java kotlin rx-java2


    【解决方案1】:

    是的,这会导致问题。

    首先,像这样嵌套Observable 是不习惯的,Reactive 方法的优点之一是组合Observables,因此具有单个干净的流。用这种方式,你打破了链条,直接的结果是更难阅读的交织代码,以及更多用于连接通知事件的代码,基本上就像用Observable包装异步回调方法一样。
    在这里,因为您已经有了响应式组件,您可以简单地组合它们,而不是用回调方法处理它们。

    其次,由于链的中断,最直接和最直接的一个 - 取消订阅外部Observable 不会自动影响内部Observable。尝试添加subscribeOn() 也是如此,并且在背压很重要的不同场景中也适用。

    另一种作曲方式可能是这样的:

    fun fetch2(): Observable<Optional<DbType>> {
            return fetchFromDb()
                    .flatMap {
                        if (shouldFetchFromApi(it.get())) {
                            fetchFromApi()
                                    .observeOn(schedulerProvider.io())
                                    .doOnSuccess { saveToDb(it) }
                                    .observeOn(schedulerProvider.ui())
                                    .flatMapObservable {
                                        fetchFromDb()
                                    }
    
                        } else {
                            Observable.empty()
                        }
                    }
        }
    

    如果出于某种原因,您无论如何都希望单独发出第一个 fetchFromDb() 结果,您也可以使用带有选择器的 publish() 来实现:

     fun fetch2(): Observable<Optional<DbType>> {
        return fetchFromDb()
                .publish {
                    Observable.merge(it,
                            it.flatMap {
                                if (shouldFetchFromApi(it.get())) {
                                    fetchFromApi()
                                            .observeOn(schedulerProvider.io())
                                            .doOnSuccess { saveToDb(it) }
                                            .observeOn(schedulerProvider.ui())
                                            .flatMapObservable {
                                                fetchFromDb()
                                            }
    
                                } else {
                                    Observable.empty()
                                }
                            })
                }
    
    }
    

    【讨论】:

    • 太棒了,谢谢!您能否举例说明使用 publish() 的含义?
    • 示例添加了发布选择器,通过这种方式,我们将获得 fetchFromDb() 的原始结果和结果 flatMapped fetchFromDb
    • 非常感谢您的帮助。
    猜你喜欢
    • 1970-01-01
    • 2015-06-04
    • 2018-06-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多