【问题标题】:How to throttle event stream using RX?如何使用 RX 限制事件流?
【发布时间】:2011-03-13 18:33:11
【问题描述】:

我想有效地限制事件流,以便在收到第一个事件时调用我的委托,但如果收到后续事件,则不会在 1 秒内调用我的委托。在超时(1 秒)到期后,如果收到后续事件,我希望调用我的委托。

有没有使用响应式扩展的简单方法?

示例代码:

static void Main(string[] args)
{
    Console.WriteLine("Running...");

    var generator = Observable
        .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
        .Timestamp();

    var builder = new StringBuilder();

    generator
        .Sample(TimeSpan.FromSeconds(1))
        .Finally(() => Console.WriteLine(builder.ToString()))
        .Subscribe(feed =>
                   builder.AppendLine(string.Format("Observed {0:000}, generated at {1}, observed at {2}",
                                                    feed.Value,
                                                    feed.Timestamp.ToString("mm:ss.fff"),
                                                    DateTime.Now.ToString("mm:ss.fff"))));

    Console.ReadKey();
}

当前输出:

Running...
Observed 064, generated at 41:43.602, observed at 41:43.602
Observed 100, generated at 41:44.165, observed at 41:44.602

但我想观察(时间戳显然会改变)

Running...
Observed 001, generated at 41:43.602, observed at 41:43.602
....
Observed 100, generated at 41:44.165, observed at 41:44.602

【问题讨论】:

  • 这只是一个很酷的 lambda 语句x =&gt; x &lt;= 100 ;)

标签: c# system.reactive


【解决方案1】:

受 Bluelings 回答的启发,我在这里提供了一个与 Reactive Extensions 2.2.5 一起编译的版本。

此特定版本会计算样本数并提供最后的采样值。为此,使用了以下类:

class Sample<T> {

  public Sample(T lastValue, Int32 count) {
    LastValue = lastValue;
    Count = count;
  }

  public T LastValue { get; private set; }

  public Int32 Count { get; private set; }

}

这里是运算符:

public static IObservable<Sample<T>> SampleResponsive<T>(this IObservable<T> source, TimeSpan interval, IScheduler scheduler = null) {
  if (source == null)
    throw new ArgumentNullException(nameof(source));
  return Observable.Create<Sample<T>>(
    observer => {
      var gate = new Object();
      var lastSampleValue = default(T);
      var lastSampleTime = default(DateTime);
      var sampleCount = 0;
      var scheduledTask = new SerialDisposable();
      return new CompositeDisposable(
        source.Subscribe(
          value => {
            lock (gate) {
              var now = DateTime.UtcNow;
              var elapsed = now - lastSampleTime;
              if (elapsed >= interval) {
                observer.OnNext(new Sample<T>(value, 1));
                lastSampleValue = value;
                lastSampleTime = now;
                sampleCount = 0;
              }
              else {
                if (scheduledTask.Disposable == null) {
                  scheduledTask.Disposable = (scheduler ?? Scheduler.Default).Schedule(
                    interval - elapsed,
                    () => {
                      lock (gate) {
                        if (sampleCount > 0) {
                          lastSampleTime = DateTime.UtcNow;
                          observer.OnNext(new Sample<T>(lastSampleValue, sampleCount));
                          sampleCount = 0;
                        }
                        scheduledTask.Disposable = null;
                      }
                    }
                  );
                }
                lastSampleValue = value;
                sampleCount += 1;
              }
            }
          },
          error => {
            if (sampleCount > 0)
              observer.OnNext(new Sample<T>(lastSampleValue, sampleCount));
            observer.OnError(error);
          },
          () => {
            if (sampleCount > 0)
              observer.OnNext(new Sample<T>(lastSampleValue, sampleCount));
            observer.OnCompleted();
          }
        ),
        scheduledTask
      );
    }
  );
}

【讨论】:

    【解决方案2】:

    好的,

    这里有 3 个场景:

    1) 我想每秒获取一个事件流的值。 意思是:如果它每秒产生更多的事件,你会得到一个更大的缓冲区。

    observableStream.Throttle(timeSpan)
    

    2) 我想获取最新的事件,它是在第二个事件发生之前产生的 意思是:其他事件被丢弃。

    observableStream.Sample(TimeSpan.FromSeconds(1))
    

    3) 您想获取所有事件,这些事件发生在最后一秒。并且每一秒

    observableStream.BufferWithTime(timeSpan)
    

    4) 你想用所有值选择第二个之间发生的事情,直到第二个过去,然后返回你的结果

    observableStream.CombineLatest(Observable.Interval(1000), selectorOnEachEvent)
    

    【讨论】:

    • 当,场景2正是我要找的,我也找不到方法:(
    • 如果有人需要它:stream.Sample(TimeSpan.FromSeconds(1))
    【解决方案3】:

    昨晚我在同样的问题上苦苦挣扎,我相信我找到了一个更优雅(或至少更短)的解决方案:

    var delay = Observable.Empty<T>().Delay(TimeSpan.FromSeconds(1));
    var throttledSource = source.Take(1).Concat(delay).Repeat();
    

    【讨论】:

      【解决方案4】:

      这是我在 RX 论坛的帮助下得到的:

      这个想法是为原始序列发出一系列“票”。这些“票”会因超时而延迟,不包括第一个,它会立即预先添加到票序列中。当一个事件进来并且有票在等待时,事件立即触发,否则它等到票然后触发。当它触发时,发出下一张票,依此类推......

      要组合门票和原创活动,我们需要一个组合器。不幸的是,“标准” .CombineLatest 不能在此处使用,因为它会触发之前使用的票证和事件。所以我必须创建我自己的组合器,它基本上是一个过滤的.CombineLatest,只有当组合中的两个元素都是“新鲜的”时才会触发——以前从未返回过。我称之为 .CombineVeryLatest 又名 .BrokenZip ;)

      使用.CombineVeryLatest,上述思路可以这样实现:

          public static IObservable<T> SampleResponsive<T>(
              this IObservable<T> source, TimeSpan delay)
          {
              return source.Publish(src =>
              {
                  var fire = new Subject<T>();
      
                  var whenCanFire = fire
                      .Select(u => new Unit())
                      .Delay(delay)
                      .StartWith(new Unit());
      
                  var subscription = src
                      .CombineVeryLatest(whenCanFire, (x, flag) => x)
                      .Subscribe(fire);
      
                  return fire.Finally(subscription.Dispose);
              });
          }
      
          public static IObservable<TResult> CombineVeryLatest
              <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
              IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
          {
              var ls = leftSource.Select(x => new Used<TLeft>(x));
              var rs = rightSource.Select(x => new Used<TRight>(x));
              var cmb = ls.CombineLatest(rs, (x, y) => new { x, y });
              var fltCmb = cmb
                  .Where(a => !(a.x.IsUsed || a.y.IsUsed))
                  .Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; });
              return fltCmb.Select(a => selector(a.x.Value, a.y.Value));
          }
      
          private class Used<T>
          {
              internal T Value { get; private set; }
              internal bool IsUsed { get; set; }
      
              internal Used(T value)
              {
                  Value = value;
              }
          }
      

      编辑:这是 Andreas Köpf 在论坛上提出的另一个更紧凑的 CombineVeryLatest 变体:

      public static IObservable<TResult> CombineVeryLatest
        <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
        IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
      {
          return Observable.Defer(() =>
          {
              int l = -1, r = -1;
              return Observable.CombineLatest(
                  leftSource.Select(Tuple.Create<TLeft, int>),
                  rightSource.Select(Tuple.Create<TRight, int>),
                      (x, y) => new { x, y })
                  .Where(t => t.x.Item2 != l && t.y.Item2 != r)
                  .Do(t => { l = t.x.Item2; r = t.y.Item2; })
                  .Select(t => selector(t.x.Item1, t.y.Item1));
          });
      }
      

      【讨论】:

        【解决方案5】:

        这是我在Rx forum 中发布的作为该问题答案的内容:

        更新: 这是一个新版本,当事件发生时差超过一秒时,不再延迟事件转发:

        public static IObservable<T> ThrottleResponsive3<T>(this IObservable<T> source, TimeSpan minInterval)
        {
            return Observable.CreateWithDisposable<T>(o =>
            {
                object gate = new Object();
                Notification<T> last = null, lastNonTerminal = null;
                DateTime referenceTime = DateTime.UtcNow - minInterval;
                var delayedReplay = new MutableDisposable();
                return new CompositeDisposable(source.Materialize().Subscribe(x =>
                {
                    lock (gate)
                    {
                        var elapsed = DateTime.UtcNow - referenceTime;
                        if (elapsed >= minInterval && delayedReplay.Disposable == null)
                        {
                            referenceTime = DateTime.UtcNow;
                            x.Accept(o);
                        }
                        else
                        {
                            if (x.Kind == NotificationKind.OnNext)
                                lastNonTerminal = x;
                            last = x;
                            if (delayedReplay.Disposable == null)
                            {
                                delayedReplay.Disposable = Scheduler.ThreadPool.Schedule(() =>
                                {
                                    lock (gate)
                                    {
                                        referenceTime = DateTime.UtcNow;
                                        if (lastNonTerminal != null && lastNonTerminal != last)
                                            lastNonTerminal.Accept(o);
                                        last.Accept(o);
                                        last = lastNonTerminal = null;
                                        delayedReplay.Disposable = null;
                                    }
                                }, minInterval - elapsed);
                            }
                        }
                    }
                }), delayedReplay);
            });
        }
        

        这是我之前的尝试:

        var source = Observable.GenerateWithTime(1, 
            x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
            .Timestamp();
        
        source.Publish(o =>
            o.Take(1).Merge(o.Skip(1).Sample(TimeSpan.FromSeconds(1)))
        ).Run(x => Console.WriteLine(x));
        

        【讨论】:

        • 刚刚在 Microsoft Todo UWP 应用积分中找到了指向此答案的链接 :)
        • @CalvT 是的!我期待看到一个开源项目列表,但这里是这个 stackflow 答案!可能与他们从 AWS 到 Azure 的过渡有关:theverge.com/2018/3/21/17146308/…
        【解决方案6】:

        好的,这是一个解决方案。我不喜欢它,特别是,但是……哦,好吧。

        向 Jon 指出 SkipWhile 的帽子提示,以及 BufferWithTime 的 cRichter。谢谢大家。

        static void Main(string[] args)
        {
            Console.WriteLine("Running...");
        
            var generator = Observable
                .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
                .Timestamp();
        
            var bufferedAtOneSec = generator.BufferWithTime(TimeSpan.FromSeconds(1));
        
            var action = new Action<Timestamped<int>>(
                feed => Console.WriteLine("Observed {0:000}, generated at {1}, observed at {2}",
                                          feed.Value,
                                          feed.Timestamp.ToString("mm:ss.fff"),
                                          DateTime.Now.ToString("mm:ss.fff")));
        
            var reactImmediately = true;
            bufferedAtOneSec.Subscribe(list =>
                                           {
                                               if (list.Count == 0)
                                               {
                                                   reactImmediately = true;
                                               }
                                               else
                                               {
                                                   action(list.Last());
                                               }
                                           });
            generator
                .SkipWhile(item => reactImmediately == false)
                .Subscribe(feed =>
                               {
                                   if(reactImmediately)
                                   {
                                       reactImmediately = false;
                                       action(feed);
                                   }
                               });
        
            Console.ReadKey();
        }
        

        【讨论】:

          【解决方案7】:

          您要搜索的是 CombineLatest。

          public static IObservable<TResult> CombineLatest<TLeft, TRight, TResult>(
              IObservable<TLeft> leftSource,
              IObservable<TRight> rightSource,
              Func<TLeft, TRight, TResult> selector
          )
          

          合并2个obeServables,并在选择器(时间)有值时返回所有值。

          编辑:约翰是对的,这可能不是首选解决方案

          【讨论】:

          • 我不明白他在追求什么——你认为这里的两个可观察对象是什么?
          • 一个是他生成的Events,selector是一个Observable.Interval(TimeSpan.FromSeconds(1))
          • 不是每次任何一个产生一个值时都会产生一个事件吗?
          • 这样做... var interval = Observable.Interval(TimeSpan.FromSeconds(1)).Start(); generator.CombineLatest(interval, (value, gate) => value ) ... 没有观察到。我不知道如何使用 CombineLatest 来实现这一点。
          • 是的,但您可以选择接下来应该发生的事情。例如如果它来自stream1,则缓存它并返回null,如果它来自stream 2,则返回缓存。之后你唯一要做的就是限制结果,不应该返回空值。 (基本上,这就是 BufferWithTime 的实现方式 ;-)
          【解决方案8】:

          你试过Throttle扩展方法吗?

          来自文档:

          忽略一个可观察序列中的值,该序列后跟在到期时间之前的另一个值

          我不太清楚这是否会做你想要的 - 因为你想忽略以下值而不是第一个值......但我会期望它成为你想成为的人。试试看:)

          编辑:嗯...不,毕竟我不认为Throttle 正确的事情。我相信我看到了您想要做的事情,但我在框架中看不到任何东西可以做到这一点。不过,我很可能错过了一些东西。你在 Rx 论坛上问过吗?很可能如果它现在不存在,他们会很乐意添加它:)

          怀疑你可以用SkipUntilSelectMany以某种方式巧妙地做到这一点......但我认为它应该有自己的方法。

          【讨论】:

          • 谢谢乔恩。我试了一下,但它不是我想要的。在示例中,使用 Throttle 会导致除了最后一个事件之外的所有事件都被忽略。我需要对第一个事件做出反应(以提供响应系统),然后以 1 秒的采样率延迟后续事件。
          • Jon,您建议在 Rx 论坛中提问。实际上有一个open issue #395 - Implement a real throttle... 要求这种确切的行为。
          • @Magnus:现在有 - 在这个答案发布 7 年后提交!
          • ? - 哎呀,我猜我没有注意到日期之后的 '10。无论如何,问题的链接在这里供其他人找到。
          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2014-05-27
          相关资源
          最近更新 更多