【问题标题】:RxJava: concatMap() with zip() gets stuckRxJava:带有 zip() 的 concatMap() 卡住
【发布时间】:2018-09-29 07:58:20
【问题描述】:

我有一个虚拟网络数据源:

    fun networkDataSource(): Single<List<Int>> {
        return Single.just((0 until 100).toList())
                .delay(150, TimeUnit.MILLISECONDS)
    }

这是一个无穷无尽的可观察性。它的主要用途是它的计算应该被“保护”,以便它只计算一次它的单个值(这里的值为 1。)

    val endless = Observable
            .just(1)
            .observeOn(Schedulers.io())
            .delay(500, TimeUnit.MILLISECONDS)
            // Counts as heavy operation, do not calculate this here once again
            .doOnNext { println("=> E: Calculated once") }
            .cache()
            //.doOnNext { println("=> E: From cache") }
            .repeat()

主流只是简单地发出值:

    val mainStream = Observable.range(0, 6)
            .doOnNext { println("=> M: Main stream $it") }

任务:

将 3 个 observables 压缩在一起,并优化网络使用,使其不会被不必要地调用。 (一旦满足数据的数量 - 在这种情况下为整数 - 。

方法:

    mainStream
            .concatMap {index ->
                Observables.zip(
                        Observable.just(index),
                        endless,
                        networkDataSource()
                                .toObservable()
                                .doOnNext { println("#> N: Network data fetch $index") }
                )
            }
            .doOnNext { println("=> After concatmap: ${it.first}") }
            .take(4)
            .doOnNext { println("=> After take: ${it.first}") }
            .subscribe(
                    { println("=> Last onnext") },
                    { it.printStackTrace() },
                    { synchronized(check) { check.notifyAll() } }
            )

完成锁定线程 - 仅用于测试:

synchronized(check) {
    check.wait()
}
println("Ending")

这是输出:

=> M: Main stream 0
=> M: Main stream 1
=> M: Main stream 2
=> M: Main stream 3
=> M: Main stream 4
=> M: Main stream 5
#> N: Network data fetch 0
=> E: Calculated once
=> After concatmap: 0
=> After take: 0
=> Last onnext
#> N: Network data fetch 1
=> After concatmap: 1
=> After take: 1
=> Last onnext

这是输出,在第二次拍摄后卡住了。 (不会在一分钟内进行)。我的问题是,为什么会这样?

作为旁注,如果我从 endless observable 中取消注释该行:

.doOnNext { println("=> E: From cache") }

它将用该行淹没控制台。为什么每次迭代都调用endless 这么多次?

flatMap() 不是这里的解决方案,因为它不考虑 take(4) 并继续完成所有网络调用。

那么我怎样才能让concatMap() 工作呢?

(我还添加了 RxJS 标签,因为这是一个反应性问题,绝对与 Kotlin 无关。如果 RxJava 库中存在这些功能,也欢迎 JS 解决方案。)

编辑:

我查看了代码,2 个输出可能是因为 prefetch 参数:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return concatMap(mapper, 2);
}

但我仍然不明白它是如何工作的。我只读到concatMap()flatmap(),但它等待每个结果。

【问题讨论】:

  • 整个设置可能会在第一项之后在同一个线程上运行,endless 中的repeat 永远不会放弃线程,从而阻止任何其他操作员继续进行。
  • @akarnokd 谢谢,你是对的,但我不会想到这一点。在endless observable 中的repeat() 之后添加了.subscribeOn(Schedulers.io()),它现在完成了。尽管在repeat()throttleFirst 之后它会发出太多的声音,但最后添加的技巧太多了。 ---您能否解释或提供一个链接 rx-way 将解决我的问题?我目前正在做的这个看起来很老套,而且远非专业,不想将其保留在 android 应用程序中。--- --- 写一个答案,我会接受它。
  • 重复缓存对我来说毫无意义,因为您只会使用其中的一项。
  • @akarnokd 哇,谢谢,没有意识到没有repeat() 也能工作。

标签: kotlin rxjs rx-java rx-java2 rx-kotlin


【解决方案1】:

来自cmets:

整个设置可能会在第一项之后在同一个线程上运行,endless 中的repeat 将永远不会放弃线程,从而阻止任何其他运算符继续进行。重复 cache 对我来说毫无意义,因为您只会使用其中的一项。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-10-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-25
    相关资源
    最近更新 更多