【问题标题】:Rx: How to buffer events (ring buffer) and only flush them when a special event occurs?Rx:如何缓冲事件(环形缓冲区)并仅在发生特殊事件时刷新它们?
【发布时间】:2013-03-12 17:13:02
【问题描述】:

我在 C# 中使用 Reactive Extensions (Rx),并希望通过以下方式过滤事件。想象一下,我有以下发起者序列:

A B C D E F X G H I X J X X K L M N O X P

我想产生以下输出:

E F X H I X J X X N O X

基本上,我会缓冲(节流?)具有最大界限(在示例中此界限为 2)的事件,当我得到某个事件(在本例中为事件 X)时,我想将该缓冲区刷新到输出并再次开始缓冲,直到我再次看到特殊事件。

我正在尝试几种方法,但没有任何运气,我想应该有一种简单的方法来完成它,我错过了。

编辑:一个限制是,我希望获得大量被丢弃的事件,并且只有少数 X 实例,因此在内存中保留一个包含数千个事件的缓冲区以仅读取最后 2 个(或 20 个)是不是一个真正的选择。

【问题讨论】:

    标签: c# events system.reactive circular-buffer


    【解决方案1】:

    为方便起见,我们需要以下两个扩展函数:

    public static class Extensions
    {
        public static IObservable<IList<TSource>> BufferUntil<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
        {
            var published = source.Publish().RefCount();
            return published.Buffer(() => published.Where(predicate));
        }
    
        public static IEnumerable<TSource> TakeLast<TSource>(this IEnumerable<TSource> source, int count)
        {
            return source.Reverse().Take(count).Reverse();
        }
    }
    

    然后我们这样解决问题:

    source.BufferUntil(c => c == 'X')
        .SelectMany(list => list.TakeLast(3))
    

    输出:

    E F X H I X J X X N O X
    

    【讨论】:

    • 谢谢,它确实有效,但在这里使用缓冲区意味着你可能会缓冲数千个事件,直到你得到一个 X,只是为了获取最新的事件。如果不需要的话,我想要一些甚至不引用过去事件的东西。
    【解决方案2】:

    我会捎带我在这里发布的另一个答案: Trouble Implementing a Sliding Window in Rx

    重要的是这个扩展方法:

    public static class Ext
    {
        public static IObservable<IList<T>> SlidingWindow<T>(
            this IObservable<T> src, 
            int windowSize)
        {
            var feed = src.Publish().RefCount();    
            // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list  
            return Observable.Zip(
            Enumerable.Range(0, windowSize)
                .Select(skip => feed.Skip(skip))
                .ToArray());
        }
    }
    

    你可以这样使用:

    void Main()
    {
        // A faked up source
        var source = new Subject<char>();
    
        var bufferSize = 2;
        Func<char, bool> eventTrigger = c => c == 'X';
    
        var query = source
            .Publish()
            .RefCount()
            // Want one extra slot to detect the "event"
            .SlidingWindow(bufferSize + 1)
            .Where(window => eventTrigger(window.Last()))
            .Select(buffer => buffer.ToObservable())
            .Switch();
    
        using(query.Subscribe(Console.WriteLine))
        {
            source.OnNext('A');
            source.OnNext('B');
            source.OnNext('C');
            source.OnNext('D');
            source.OnNext('E');
            source.OnNext('F');
            source.OnNext('X');
            source.OnNext('G');
            source.OnNext('H');
            source.OnNext('I');
            source.OnNext('X');
            Console.ReadLine();
        }    
    }
    

    输出:

    E
    F
    X
    H
    I
    X
    

    【讨论】:

      【解决方案3】:

      这是一个回答我自己问题的刺,如果您发现任何问题,请告诉我。

      public static class ObservableHelper
      {
          /// <summary>
          /// Buffers entries that do no satisfy the <paramref name="shouldFlush"/> condition, using a circular buffer with a max
          /// capacity. When an entry that satisfies the condition ocurrs, then it flushes the circular buffer and the new entry,
          /// and starts buffering again.
          /// </summary>
          /// <typeparam name="T">The type of entry.</typeparam>
          /// <param name="stream">The original stream of events.</param>
          /// <param name="shouldFlush">The condition that defines whether the item and the buffered entries are flushed.</param>
          /// <param name="bufferSize">The buffer size for accumulated entries.</param>
          /// <returns>An observable that has this filtering capability.</returns>
          public static IObservable<T> FlushOnTrigger<T>(this IObservable<T> stream, Func<T, bool> shouldFlush, int bufferSize)
          {
              if (stream == null) throw new ArgumentNullException("stream");
              if (shouldFlush == null) throw new ArgumentNullException("shouldFlush");
              if (bufferSize < 1) throw new ArgumentOutOfRangeException("bufferSize");
      
              return System.Reactive.Linq.Observable.Create<T>(observer =>
              {
                  var buffer = new CircularBuffer<T>(bufferSize);
                  var subscription = stream.Subscribe(
                      newItem =>
                          {
                              bool result;
                              try
                              {
                                  result = shouldFlush(newItem);
                              }
                              catch (Exception ex)
                              {
                                  return;
                              }
      
                              if (result)
                              {
                                  foreach (var buffered in buffer.TakeAll())
                                  {
                                      observer.OnNext(buffered);
                                  }
      
                                  observer.OnNext(newItem);
                              }
                              else
                              {
                                  buffer.Add(newItem);
                              }
                          },
                      observer.OnError,
                      observer.OnCompleted);
      
                  return subscription;
              });
          }
      }
      

      顺便说一句,CircularBuffer 不是开箱即用的,但实现很简单。

      那我就打个电话:

              data
                  .FlushOnTrigger(item => item == 'X', bufferSize: 2)
                  .Subscribe(Console.WriteLine);
      

      【讨论】:

      • 除了参数检查之外,我只看到一个主要的疏忽。您没有传递 OnError 或 OnCompleted 通知。 (将observer.OnErrorobserver.OnCompleted 传递给您进行的Subscribe 调用应该在这里工作。)我怀疑Synchronize 调用将始终需要并且通常会留给调用者在需要时插入。与问题的 RX 部分无关,但我希望 CircularBuffer 直接实现 IEnumerable 而不需要 ReadAll 方法。
      • 这里的Synchronize() 运算符是什么?
      • 感谢您的反馈,我删除了同步调用。使用 TakeAll 方法的原因是我想明确(并强制执行)我读取整个缓冲区的那一刻,然后它会自动清空(而不是一方面枚举项目然后清除缓冲区)跨度>
      • 这种方法现在正在一个快速入门示例应用程序中使用,该应用程序使用语义日志记录块来缓冲内存中的 versbose 事件,并且仅在发生错误时将这些事件刷新到日志中。您可以在go.microsoft.com/fwlink/p/?LinkID=290898 找到代码(EnterpriseLibrary6-QuickStarts-source.exe 是相关文件)
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-08-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多