【问题标题】:Trouble Implementing a Sliding Window in Rx在 Rx 中实现滑动窗口的麻烦
【发布时间】:2020-02-12 15:05:14
【问题描述】:

我为响应式扩展创建了一个 SlidingWindow 运算符,因为我想轻松监控滚动平均值等内容。举个简单的例子,我想订阅鼠标事件,但每次有一个事件我想接收最后三个(而不是等待每三个事件接收最后三个)。这就是为什么我发现的 Window 重载似乎没有给我开箱即用的东西。

这就是我想出的。鉴于其频繁的 List 操作,我担心它可能不是最高效的解决方案:

public static IObservable<List<T>> SlidingWindow<T>(this IObservable<T> seq, int length)
{
    var seed = new List<T>();

    Func<List<T>, T, List<T>> accumulator = (list, arg2) =>
    {
        list.Add(arg2);

        if (list.Count > length)
            list.RemoveRange(0, (list.Count - length));

        return list;
    };

    return seq.Scan(seed, accumulator)
                .Where(list => list.Count == length);
}

可以这样调用:

var rollingSequence = Observable.Range(1, 5).SlidingWindow().ToEnumerable();

然而,令我惊讶的是,并没有收到预期的结果

1,2,3
2,3,4
3,4,5

我收到结果

2,3,4
3,4,5
3,4,5

任何见解将不胜感激!

【问题讨论】:

    标签: c# system.reactive rx.net


    【解决方案1】:

    使用您的原始测试,将参数 3 用于计数,这会产生所需的结果:

    public static IObservable<IList<T>> SlidingWindow<T>(
        this IObservable<T> source, int count)
    {
        return source.Buffer(count, 1)
                     .Where(list => list.Count == count);
    }
    

    像这样测试:

    var source = Observable.Range(1, 5);
    var query = source.SlidingWindow(3);
    using (query.Subscribe(i => Console.WriteLine(string.Join(",", i))))
    {
    
    }
    

    输出:

    1,2,3
    2,3,4
    3,4,5
    

    【讨论】:

    • 我知道这是一个很老的答案,但需要.Where(list =&gt; list.Count == count) 吗?我试过没有它,只是创建一个Buffer(count, 1),它似乎也能正常工作。
    • 它将因序列中的最后一个 count - 1 事件而失败,因为这些事件的输出将少于 count 项目 - OP 专门要求在窗口中提供 count 项目。在我的答案中尝试使用测试用例,看看会发生什么。即,您将获得另外两个包含“4,5”和仅“5”的事件。
    • 你是对的。在我之前的测试中,我使用了 ISubject 而不是 IObservable:查看文档,ISubject 扩展了 IObservable,所以在这种情况下可能有不同的策略,因此即使 @ 也不会返回最后的“4、5”和“5” 987654329@ 不见了。我是响应式新手,我还需要学习
    【解决方案2】:

    只需source.Window(count, 1) - 或source.Buffer(count, 1) 它是一个“计数”项目的窗口/缓冲区,滑动一个。

    【讨论】:

      【解决方案3】:

      这里的滑动窗口实现不足以满足我对滑动窗口的想法。最接近的是使用Buffer(N, 1),但这是一个问题,因为它在发出第一个结果之前等待前 N 个项目,然后滑出序列的末尾。我想要一次最多发出 N 个项目。

      我最终完成了这个实现:

      public static IObservable<IList<T>> SlidingWindow<T>(this IObservable<T> obs, int windowSize) =>
          Observable.Create<IList<T>>(observer =>
          {
              var buffer = new CircularBuffer<T>(windowSize);
              return obs.Subscribe(
                  value =>
                  {
                      buffer.Add(value);
                      observer.OnNext(buffer.ToList());
                  },
                  ex => observer.OnError(ex),
                  () => observer.OnCompleted()
              );
          });
      

      我最初使用队列作为缓冲区,但想使用更轻量级的东西。

      public class CircularBuffer<T> : IReadOnlyList<T>
      {
          private readonly T[] buffer;
          private int offset;
          private int count;
          public CircularBuffer(int bufferSize) => this.buffer = new T[bufferSize];
          public int Capacity => buffer.Length;
          public int Count => count;
          public T this[int index] => index < 0 || index >= count
              ? throw new ArgumentOutOfRangeException(nameof(index))
              : buffer[(offset + index) % buffer.Length];
          public void Add(T value)
          {
              buffer[(offset + count) % buffer.Length] = value;
              if (count < buffer.Length) count++;
              else offset = (offset + 1) % buffer.Length;
          }
          public IEnumerator<T> GetEnumerator()
          {
              for (var i = 0; i < count; i++)
                  yield return buffer[(offset + i) % buffer.Length];
          }
          IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
      }
      

      它将产生Observable.Range(0, 10).SlidingWindow(3)的序列:

       0,1,2,3,4,5,6,7,8,9
      [0]
      [0,1]
      [0,1,2]
        [1,2,3]
          [2,3,4]
            [3,4,5]
              [4,5,6]
                [5,6,7]
                  [6,7,8]
                    [7,8,9]
      

      【讨论】:

      • 我的特定用例,限制 linqpad 中任务的日志输出最多输出 N 项,以防止达到输出窗口的限制。 logs.SlidingWindow(1000).DumpLatest()
      【解决方案4】:

      试试这个 - 我必须坐下来考虑一下它的相对性能,但它至少 可能一样好,而且更容易阅读:

      public static IObservable<IList<T>> SlidingWindow<T>(
             this IObservable<T> src, 
             int windowSize)
      {
          var feed = src.Publish().RefCount();    
          // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list  
          return Observable.Zip(
             Enumerable.Range(0, windowSize)
                 .Select(skip => feed.Skip(skip))
                 .ToArray());
      }
      

      测试台:

      var source = Observable.Range(0, 10);
      var query = source.SlidingWindow(3);
      using(query.Subscribe(Console.WriteLine))
      {               
          Console.ReadLine();
      }
      

      输出:

      ListOf(0,1,2)
      ListOf(1,2,3)
      ListOf(2,3,4)
      ListOf(3,4,5)
      ListOf(4,5,6)
      ...
      

      编辑:顺便说一句,我发现自己强迫性地.Publish().RefCount()ing 自从因为不这样做而被烧伤......我认为这里没有严格要求。

      为 yzorg 编辑:

      如果你像这样扩充方法,你会更清楚地看到运行时行为:

      public static IObservable<IList<T>> SlidingWindow<T>(
          this IObservable<T> src, 
          int windowSize)
      {
          var feed = src.Publish().RefCount();    
          // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list  
          return Observable.Zip(
          Enumerable.Range(0, windowSize)
              .Select(skip => 
              {
                  Console.WriteLine("Skipping {0} els", skip);
                  return feed.Skip(skip);
              })
              .ToArray());
      }
      

      【讨论】:

      • @blaster 没问题 - 事实上,感谢您“让”我把它写出来,因为自从回答这个问题后,我自己已经用过几次了。 ;)
      • 我不认为这很好。 .Publish()、.Range(0,x) 和 .Skip() - 当这些组合起来时,它看起来性能很差,特别是 O n^2,因为 Skip 将一遍又一遍地迭代整个流。例如,您需要迭代 30,000 个整数才能得到 (10000, 10001, 10002)。因此,您实际上并没有在内存中保留源流的滑动缓冲区,而是必须将整个源流(从一开始)保留在内存中,这是我认为我们正在避免的。
      • 另外,这适用于“合理”大小的窗口...我不会将其用于 10k 窗口。
      • James World's 或 Luke's 应该是公认的答案。
      猜你喜欢
      • 2017-09-04
      • 1970-01-01
      • 2015-06-16
      • 2022-11-27
      • 2012-02-04
      • 1970-01-01
      • 2020-07-10
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多