【问题标题】:How to filter an Observable based on the value of another?如何根据另一个值过滤 Observable?
【发布时间】:2013-12-26 21:30:28
【问题描述】:

我正在尝试找到某种方法来根据另一个可观察对象的值过滤可观察对象。例如,假设我们只想接收时间 x 和 y 之间的事件。可以根据计时器的值过滤 observable 吗?

【问题讨论】:

    标签: c# filter system.reactive observable reactive-extensions-js


    【解决方案1】:

    几种方法。没有一些代码,很难知道哪个是最好的。

    通过 CombineLatest(一直收听并根据最新值进行过滤):

    var astream = ...;
    var bstream = ...;
    var filtered = Observable.CombineLatest(astream, bstream, (a, b) => new { a, b })
              .Where(v => v.b >= x && v.b <= y)
              .Select(v => v.a); // Alas sometimes you will get duplicate a's.
    

    通过选择和切换(仅当bstream 满足某些条件时才收听astream):

    var astream = ...;
    var bstream = ...;
    var filtered = bstream
        .Select(b => (b >= x && b <= y) ? astream : Observable.Empty<T>())
        .Switch();
    

    【讨论】:

    • 我非常喜欢Select + Switch 的想法!
    • 非常感谢,真的很喜欢 Select + Switch 的想法!会投票,但声望不够。
    • 所以我尝试了 Select+Switch 的想法,但偶然发现了一个问题,我认为 switch 方法会忽略最新值,只是等到下一个值。解释一下:` var fil = input.map(function(x) { return (x % 3) == 0; }); fil.map(function(x) { if(x) { return input; } else { return Rx.Observable.empty(); } }).switch().subscribe(function(x) { print(x); } ) ' 给定递增整数的输入,此代码不打印任何内容??
    • 对于你刚才描述的场景,你为什么不直接使用filterinput.filter(function (i) { return (i % 3) == 0; })
    【解决方案2】:

    正如Brandon'提到的,有很多方法可以组合事件流。

    Observable.Join 的反应式连接是一种非常通用的工具,但很大一部分 Rx 内置运算符库以支持基于另一个过滤的方式组合流。

    我真的很喜欢 Brandon 的 Select + Switch 技术(+1 来自我);我已将其归档以备将来使用!

    这是一种直接解决将源流过滤到开始和结束时间的问题的方法。它比Select + Switch 有一些优势,包括:

    • 它避免检查源流的每个事件的过滤条件。
    • 它避免了需要“空”流。
    • 它只订阅一次源。
    • 它会在到达结束时间后立即发送OnCompleted(),而不是像源流那样持续。

    这实际上归结为 Observable.Window 运算符的特定重载,但我将逐步解释它。

    基本思想是通过应用在开始时间打开并在结束时间关闭的Window 来过滤源流。

    首先让我们创建一个一秒脉冲的示例源流 (xs),以及开始时间和结束时间:

    var xs = Observable.Interval(TimeSpan.FromSeconds(1));
    
    var startTime = DateTime.Now + TimeSpan.FromSeconds(5);    
    var endTime = DateTime.Now + TimeSpan.FromSeconds(8);
    

    为了简洁,我没有检查startTime 是否在endTime 之前。现在我们创建一个流来打开窗口,以及一个流来关闭窗口:

    var start = Observable.Timer(startTime);
    var end = Observable.Timer(endTime);
    

    最后使用Observable.Window 过滤源流。此运算符的输出是一个流 (IObservable&lt;IObservable&lt;T&gt;&gt;) - 每个子流都是一个新窗口。

    我们将使用的重载接受一个流,其事件标记一个新窗口的打开,以及一个工厂函数,在给定触发窗口打开的事件的情况下提供关闭流。

    使用我们的 Timer 流,我们知道将在开始时间创建一个窗口,并在结束时间关闭。

    我们使用Observable.Merge 对流进行扁平化处理:

    var filtered = xs.Window(start, _ => end).Merge();
    

    如果我们这样订阅:

    filtered.Subscribe(Console.WriteLine);
    

    正如预期的那样,我们得到以下输出:

    4
    5
    6
    

    同样,有很多很多方法可以解决这个问题,而不仅仅是使用Window。例如,您还可以轻松扩展此解决方案以支持多个窗口(通过使用打开时间流和关闭时间流工厂)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-01-23
      • 1970-01-01
      • 2021-08-19
      • 2014-08-23
      • 2020-05-22
      相关资源
      最近更新 更多