【问题标题】:How to implement buffering with timeout in RX如何在 RX 中实现超时缓冲
【发布时间】:2014-04-05 06:06:14
【问题描述】:

我需要实现一个事件处理,即在一段时间内没有新事件到达时延迟完成。 (当文本缓冲区发生变化时我必须排队解析任务,但我不想在用户仍在输入时开始解析。)

我是 RX 的新手,但据我所知,我需要 BufferWithTime 和 Timeout 方法的组合。我想这是这样工作的:它缓冲事件,直到在后续事件之间的指定时间段内定期收到它们。如果事件流中存在间隙(长于时间跨度),它应该返回传播到目前为止缓冲的事件。

看看Buffer和Timeout是怎么实现的,我大概可以实现我的BufferWithTimeout方法(如果大家有的话,请分享给我),但不知道是不是结合现有的方法就可以实现。有什么想法吗?

【问题讨论】:

    标签: .net system.reactive


    【解决方案1】:

    这是一个相当老的问题,但我相信以下答案值得一提,因为所有其他解决方案都迫使用户手动订阅、跟踪更改等。

    我提供以下作为“Rx-y”解决方案。

    var buffers = source
        .GroupByUntil(
            // yes. yes. all items belong to the same group.
            x => true,
            g => Observable.Amb<int>(
                   // close the group after 5 seconds of inactivity
                   g.Throttle(TimeSpan.FromSeconds(5)),
                   // close the group after 10 items
                   g.Skip(9)
                 ))
        // Turn those groups into buffers
        .SelectMany(x => x.ToArray());
    

    基本上,源是窗口化的,直到根据最新窗口定义了一些可观察对象。创建了一个新窗口(分组的 observable),我们使用该窗口来确定窗口何时应该关闭。在这种情况下,我会在 5 秒不活动或最大长度为 10 (9+1) 后关闭窗口。

    【讨论】:

      【解决方案2】:

      我认为BufferWithTime 是你所追求的。

      没有内置任何东西,但是这样的东西应该可以工作:

      注意:如果源发生错误,则不会刷新缓冲区。这与BufferWith* 的当前(或我上次检查时的当前)功能相匹配

      public static IObservable<TSource[]> BufferWithTimeout<TSource>(
          this IObservable<TSource> source, TimeSpan timeout)
      {
          return source.BufferWithTimeout(timeout, Scheduler.TaskPool);
      }
      
      public static IObservable<TSource[]> BufferWithTimeout<TSource>(
          this IObservable<TSource> source, TimeSpan timeout, IScheduler scheduler)
      {
          return Observable.CreateWithDisposable<TSource[]>(observer =>
          {
              object lockObject = new object();
              List<TSource> buffer = new List<TSource>();
      
              MutableDisposable timeoutDisposable = new MutableDisposable();
      
              Action flushBuffer = () =>
              {
                  TSource[] values;
      
                  lock(lockObject)
                  {
                      values = buffer.ToArray();
                      buffer.Clear();
                  }
      
                  observer.OnNext(values);
              };
      
              var sourceSubscription = source.Subscribe(
                  value =>
                  {
                      lock(lockObject)
                      {
                          buffer.Add(value);
                      }
      
                      timeoutDisposable.Disposable = 
                          scheduler.Schedule(flushBuffer, timeout);
                  },
                  observer.OnError,
                  () =>
                  {
                      flushBuffer();
                      observer.OnCompleted();
                  });
      
              return new CompositeDisposable(sourceSubscription, timeoutDisposable);
          });
      }
      

      【讨论】:

      • BufferWithTime 定期触发一段时间,但我想在一定的空闲时间后触发。
      • 那不是说如果没有间隙就永远不会收到输出吗?
      • 是的,如果从来没有间隙,我不想接收任何事件。但由于事件是用户按键产生的,超时时间为100ms,迟早会有差距:)
      • 更新了编译后修复和测试。
      • 酷!谢谢!我去详细看看。
      【解决方案3】:

      除了 Richard Szalay 的回答之外,我刚刚研究了最新 rx 版本中的新 Window 运算符。它“有点”解决了您的问题,因为您可以“缓冲超时”,即在持续到超时的时间窗口内获取输出,但不是将结果作为 IEnumerable 接收,而是实际得到它们作为 IObservable。

      这里有一个简单的例子来说明我的意思:

      private void SetupStream()
      {
          var inputStream = Observable.FromEvent<MouseButtonEventHandler, MouseButtonEventArgs>(
              h => new MouseButtonEventHandler(h), 
              h => MouseDown += h,
              h => MouseDown -= h);
      
          var timeout = inputStream.Select(evt => Observable.Timer(TimeSpan.FromSeconds(10), Scheduler.Dispatcher))
              .Switch();
      
          inputStream.Window(() => timeout)
              .Subscribe(OnWindowOpen);
      }
      
      
      private void OnWindowOpen(IObservable<IEvent<MouseButtonEventArgs>> window)
      {
          Trace.WriteLine(string.Format("Window open"));
      
          var buffer = new List<IEvent<MouseButtonEventArgs>>();
      
          window.Subscribe(click =>
          {
      
              Trace.WriteLine(string.Format("Click"));
      
              buffer.Add(click);
      
          }, () => ProcessEvents(buffer));
      }
      
      private void ProcessEvents(IEnumerable<IEvent<MouseButtonEventArgs>> clicks)
      {
          Trace.WriteLine(string.Format("Window closed"));
      
          //...
      }
      

      每次窗口打开时,您都会收到所有事件,当它们进入时,将它们存储在缓冲区中,并在窗口完成时处理(实际上发生在下一个窗口打开时)。

      不确定 Richard 是否会更改他的示例以使用 Window 现在它可用,但认为它可能值得提出作为替代方案。

      【讨论】:

      • 有趣。我一直想看看最新版本,现在我有借口了!
      【解决方案4】:

      如果您只需要在用户停止输入一定时间后运行一个操作,而不一定需要中间事件,那么Throttle 就是您所追求的操作。查看here 以了解其在该场景中的使用示例。

      【讨论】:

      • 在我的情况下,我不仅需要更改最终结果(这是一个大文本块,我只重新解析更改的部分),但你是对的,如果个别更改是没意思,Throttle 是个不错的选择。
      • 在我的应用程序中,当用户停止在 3D 视图中移动相机时,我想做一些处理。油门是一个不错的选择。
      • 链接失效了,可以把例子加到帖子里吗?
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-10-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多