【发布时间】:2018-01-15 09:40:23
【问题描述】:
我正在尝试根据下游条件过滤掉更多上游项目。 mapProcess 本质上是启动一个进程(脚本或 exe)。该过程可能需要一些时间才能完成,我想在完成之前忽略任何进一步的上游项目。 createProcess 还返回一个 StdOut 的 Observable。我们切换到由 createProcess 创建的 IObservable 并将 arg 映射到 StdOut。
示例:
let mapProcess obs =
obs
|> Observable.map (fun arg -> createProcess arg)
|> Observable.switch
我尝试过的:这可行,但对这里的可变对象并不完全满意。
let mapProcess obs =
let mutable processNotRunning = true
obs
|> Observable.filter (fun _ -> processNotRunning)
|> Observable.map (fun arg -> processNotRunning <- false
createProcess arg)
|> Observable.switch
|> Observable.iter (fun _ -> processNotRunning <- true)
|> Observable.finallyDo (fun _ -> processNotRunning <- true)
我认为我可能需要某种“switchIfSeen”Observable 函数,该函数仅在当前订阅的 observable 已生成项目或完成时才会切换。仅仅结合一些现有的 RX 功能,我是否可能错过了一种更简单的方法?
【问题讨论】:
-
我不确定我是否正确理解了这些要求,但您可能正在寻找
flatMap- reactivex.io/documentation/operators/flatmap.html。与switch不同,除非当前“内部”可观察对象完成,否则它不会开始从后续可观察对象生成值。所以你的完整代码可能看起来像obs |> Observable.flatMap createProcess -
问题是您是否想处理
obs中的所有事件,或者如果前一个进程仍在运行,您是否真的想忽略它们 - 在后一种情况下,我认为flatMap不会帮助你 -
@HonzaBrestan 我想忽略它们。 flatMap 是一个很好的建议,而且非常接近。
标签: f# reactive-programming system.reactive