【问题标题】:RX filter based on downstream condition基于下游条件的 RX 过滤器
【发布时间】:2018-01-15 09:40:23
【问题描述】:

我正在尝试根据下游条件过滤掉更多上游项目。 mapProcess 本质上是启动一个进程(脚本或 exe)。该过程可能需要一些时间才能完成,我想在完成之前忽略任何进一步的上游项目。 createProcess 还返回一个 StdOut 的 Observable。我们切换到由 createProcess 创建的 IObservable 并将 arg 映射到 StdOut。

示例:

let mapProcess obs =
  obs
  |> Observable.map (fun arg -> createProcess arg)
  |> Observable.switch

我尝试过的:这可行,但对这里的可变对象并不完全满意。

let mapProcess obs =
  let mutable processNotRunning = true
  obs
  |> Observable.filter (fun _ -> processNotRunning)
  |> Observable.map (fun arg -> processNotRunning <- false
                                createProcess arg)  
  |> Observable.switch
  |> Observable.iter (fun _ -> processNotRunning <- true)
  |> Observable.finallyDo (fun _ -> processNotRunning <- true)

我认为我可能需要某种“switchIfSeen”Observable 函数,该函数仅在当前订阅的 observable 已生成项目或完成时才会切换。仅仅结合一些现有的 RX 功能,我是否可能错过了一种更简单的方法?

【问题讨论】:

  • 我不确定我是否正确理解了这些要求,但您可能正在寻找flatMap - reactivex.io/documentation/operators/flatmap.html。与switch 不同,除非当前“内部”可观察对象完成,否则它不会开始从后续可观察对象生成值。所以你的完整代码可能看起来像obs |&gt; Observable.flatMap createProcess
  • 问题是您是否想处理obs 中的所有事件,或者如果前一个进程仍在运行,您是否真的想忽略它们 - 在后一种情况下,我认为flatMap 不会帮助你
  • @HonzaBrestan 我想忽略它们。 flatMap 是一个很好的建议,而且非常接近。

标签: f# reactive-programming system.reactive


【解决方案1】:

[...] 仅在当前订阅的 observable 产生了一个 项目或已完成

忽略值的一种方法是将热 observable 转换为冷 observable - 这样它就可以推出项目而不管谁在听。然后,您只在需要时才听。

  var map = argn.Select(CreateProcess).Publish().RefCount();

        map.SelectMany(o => o) //flatmap
           .Take(1)
           .Repeat()
           .Subscribe(d => Console.WriteLine($"Did task which took {d * 100}msecs" ));

测试:

CreateProcess 只是一个计时器,它会产生 x100 毫秒的固定延迟。)

    private static void Main(string[] args)
    {
        var argn = Observable.Interval(TimeSpan.FromMilliseconds(100)).Publish().RefCount();
        argn.Subscribe(Console.WriteLine);

        var map = argn.Select(CreateProcess).Publish().RefCount();

        map.SelectMany(o => o)
           .Take(1)
           .Repeat()
           .Subscribe(d => Console.WriteLine($"Did task which took {d * 100}msecs" ));

        Console.ReadKey();
    }

    static IObservable<long> CreateProcess(long i) => Observable.Timer(TimeSpan.FromMilliseconds(i * 100)).Select(_ => i);

输出:

0
Did task which took 0msecs
1
2
Did task which took 100msecs
3
4
5
Did task which took 300msecs
6
7
8
9
10
11
Did task which took 600msecs

【讨论】:

    猜你喜欢
    • 2018-06-12
    • 1970-01-01
    • 2010-10-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-26
    • 2016-12-10
    • 2014-02-12
    相关资源
    最近更新 更多