【发布时间】:2016-02-18 08:35:37
【问题描述】:
如何使用多线程在Observable 上运行filter、map 和flatMap:
def withDelay[T](delay: Duration)(t: => T) = {
Thread.sleep(delay.toMillis)
t
}
Observable
.interval(500 millisecond)
.filter(x => {
withDelay(1 second) { x % 2 == 0 }
})
.map(x => {
withDelay(1 second) { x * x }
}).subscribe(println(_))
目标是使用多个线程同时运行过滤和转换操作。
【问题讨论】:
-
@david.mihola,是的,我检查了它们,并且能够在多个线程中执行
subscribe块,但是对于 map、flatMap 和 filter,我无法做到这一点。我假设我可能会在过滤或转换时调用其他 API 或从数据库中获取其他数据,因此我想确保此代码将同时执行。
标签: scala concurrency functional-programming rx-java rx-scala