【问题标题】:.NET Throttle algorithm.NET 节流算法
【发布时间】:2012-01-03 07:30:03
【问题描述】:

我想在 .net(C# 或 VB)中实现一个好的节流算法,但我不知道该怎么做。

案例是我的 asp.net 网站应该向另一个网站发布请求以获取结果。 每分钟最多应发送 300 个请求。

如果请求超过 300 个限制,则另一方 Api 不返回任何内容(我不想在我的代码中使用它作为检查)。

附:我见过 .net 以外的其他语言的解决方案,但我是新手,请保持友善,让您的答案简单到 123。

谢谢

【问题讨论】:

标签: c# .net vb.net throttling


【解决方案1】:

您可以有一个简单的应用程序(或会话)类并检查它的命中。这是非常粗略的,只是为了给你一个想法:

public class APIHits {
    public int hits { get; private set; }
    private DateTime minute = DateTime.Now();

    public bool AddHit()
    {
        if (hits < 300) {
            hits++;
            return true;
        }
        else
        {
            if (DateTime.Now() > minute.AddSeconds(60)) 
            {
                //60 seconds later
                minute = DateTime.Now();
                hits = 1;
                return true;
            }
            else
            {
                return false;
            }
        }
    }
}

【讨论】:

    【解决方案2】:

    最简单的方法是计算数据包之间的时间间隔,并且不允许它们以每 0.2 秒超过一个的速率发送。也就是记录你被调用的时间和下一次被调用的时间,检查至少200ms已经过去,否则什么都不返回。

    此方法可行,但仅适用于平滑的数据包流 - 如果您预计活动会爆发,那么您可能希望在任何 200 毫秒期间允许 5 条消息,只要 1 分钟内的平均值不超过 300 次调用.在这种情况下,您可以使用一个值数组来存储最近 300 个数据包的“时间戳”,然后每次 yoiu 接到电话时,您可以回顾“300 个电话前”以检查是否至少过去了 1 分钟.

    对于这两种方案,Environment.TickCount 返回的时间值足以满足您的需求(跨度不少于 200 毫秒),因为它精确到大约 15 毫秒。

    【讨论】:

      【解决方案3】:

      这是一个异步和同步的节流实现,它可以限制每个持续时间内对方法的调用次数。它基于当前时间与 DateTimeOffset 和 Task.Delay/Thread.Sleep 的简单比较。它应该适用于许多不需要高度时间分辨率的实现,并且应该在您想要限制的方法之前调用。

      此解决方案允许用户指定每个持续时间允许的呼叫次数(默认为每个时间段 1 次呼叫)。这可以让您的节流阀尽可能地“突发”,但代价是无法控制调用者何时可以继续,或者调用可以尽可能均匀地间隔。

      假设我们的目标是 300 次调用/分钟:您可以设置一个持续时间为 200 毫秒的常规节流阀,该节流阀将平均分布每个调用,其间至少间隔 200 毫秒,或者您可以创建一个节流阀将允许每秒进行 5 次调用,而不考虑它们的间隔(前 5 次调用获胜 - 可能一次全部调用!)。两者都将速率限制保持在 300calls/min 以下,但前者处于平均分离的极端,后者更“突发”。在循环中处理项目时,将事物均匀分布是很好的,但对于并行运行的事物(如 Web 请求)可能不是那么好,因为调用时间不可预测,不必要的延迟实际上可能会降低吞吐量。同样,您的用例和测试必须成为您最佳的指南。

      这个类是线程安全的,你需要在某个地方保存对它的实例的引用,以便需要共享它的对象实例可以访问。对于作为应用程序实例上的字段的 ASP.NET Web 应用程序,可以是网页/控制器上的静态字段,从您选择的 DI 容器作为单例注入,或者您可以访问共享的任何其他方式特定场景中的实例。

      编辑:更新以确保延迟永远不会超过持续时间。

          public class Throttle
          {
              /// <summary>
              /// How maximum time to delay access.
              /// </summary>
              private readonly TimeSpan _duration;
      
              /// <summary>
              /// The next time to run.
              /// </summary>
              private DateTimeOffset _next = DateTimeOffset.MinValue;
      
              /// <summary>
              /// Synchronize access to the throttle gate.
              /// </summary>
              private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1, 1);
      
              /// <summary>
              /// Number of allowed callers per time window.
              /// </summary>
              private readonly int _numAllowed = 1;
      
              /// <summary>
              /// The number of calls in the current time window.
              /// </summary>
              private int _count;
      
              /// <summary>
              /// The amount of time per window.
              /// </summary>
              public TimeSpan Duration => _duration;
      
              /// <summary>
              /// The number of calls per time period.
              /// </summary>
              public int Size => _numAllowed;
      
              /// <summary>
              /// Crates a Throttle that will allow one caller per duration.
              /// </summary>
              /// <param name="duration">The amount of time that must pass between calls.</param>
              public Throttle(TimeSpan duration)
              {
                  if (duration.Ticks <= 0)
                      throw new ArgumentOutOfRangeException(nameof(duration));
      
                  _duration = duration;
              }
      
              /// <summary>
              /// Creates a Throttle that will allow the given number of callers per time period.
              /// </summary>
              /// <param name="num">The number of calls to allow per time period.</param>
              /// <param name="per">The duration of the time period.</param>
              public Throttle(int num, TimeSpan per)
              {
                  if (num <= 0 || per.Ticks <= 0)
                      throw new ArgumentOutOfRangeException();
      
                  _numAllowed = num;
                  _duration = per;
              }
      
              /// <summary>
              /// Returns a task that will complete when the caller may continue.
              /// </summary>
              /// <remarks>This method can be used to synchronize access to a resource at regular intervals
              /// with no more frequency than specified by the duration,
              /// and should be called BEFORE accessing the resource.</remarks>
              /// <param name="cancellationToken">A cancellation token that may be used to abort the stop operation.</param>
              /// <returns>The number of actors that have been allowed within the current time window.</returns>
              public async Task<int> WaitAsync(CancellationToken cancellationToken = default(CancellationToken))
              {
                  await _mutex.WaitAsync(cancellationToken)
                      .ConfigureAwait(false);
      
                  try
                  {
      
                      var delay = _next - DateTimeOffset.UtcNow;
      
                      // ensure delay is never longer than the duration
                      if (delay > _duration)
                          delay = _duration;
                      
                      // continue immediately based on count
                      if (_count < _numAllowed) 
                      {
                          _count++;
      
                          if (delay.Ticks <= 0) // past time window, reset
                          {
                              _next = DateTimeOffset.UtcNow.Add(_duration);
                              _count = 1;
                          }
      
                          return _count;
                      }
                      
                      // over the allowed count within the window
                      if (delay.Ticks > 0)
                      {
                          // delay until the next window
                          await Task.Delay(delay, cancellationToken)
                              .ConfigureAwait(false);
                      }
      
                      _next = DateTimeOffset.UtcNow.Add(_duration);
                      _count = 1;
      
                      return _count;
                  }
                  finally
                  {
                      _mutex.Release();
                  }
              }
      
              /// <summary>
              /// Returns a task that will complete when the caller may continue.
              /// </summary>
              /// <remarks>This method can be used to synchronize access to a resource at regular intervals
              /// with no more frequency than specified by the duration,
              /// and should be called BEFORE accessing the resource.</remarks>
              /// <param name="cancellationToken">A cancellation token that may be used to abort the stop operation.</param>
              /// <returns>The number of actors that have been allowed within the current time window.</returns>
              public int Wait(CancellationToken cancellationToken = default(CancellationToken))
              {
                  _mutex.Wait(cancellationToken);
      
                  try
                  {
      
                      var delay = _next - DateTimeOffset.UtcNow;
      
                      // ensure delay is never larger than the duration.
                      if (delay > _duration)
                          delay = _duration;
                      
                      // continue immediately based on count
                      if (_count < _numAllowed) 
                      {
                          _count++;
      
                          if (delay.Ticks <= 0) // past time window, reset
                          {
                              _next = DateTimeOffset.UtcNow.Add(_duration);
                              _count = 1;
                          }
      
                          return _count;
                      }
                      
                      // over the allowed count within the window
                      if (delay.Ticks > 0)
                      {
                          // delay until the next window
                          Thread.Sleep(delay);
                      }
      
                      _next = DateTimeOffset.UtcNow.Add(_duration);
                      _count = 1;
      
                      return _count;
                  }
                  finally
                  {
                      _mutex.Release();
                  }
              }
          }
      

      此示例显示了如何在循环中同步使用节流阀,以及取消的行为方式。如果您将其想象为人们排队乘车,那么如果发出取消标记信号,就好像该人越界而其他人向前移动。

       var t = new Throttle(5, per: TimeSpan.FromSeconds(1));
       var c = new CancellationTokenSource(TimeSpan.FromSeconds(22));
       foreach(var i in Enumerable.Range(1,300)) {
           var ct = i > 250
               ? default(CancellationToken)
               : c.Token;
           try
           {
               var n = await t.WaitAsync(ct).ConfigureAwait(false);
               WriteLine($"{i}: [{n}] {DateTime.Now}");
           }
           catch (OperationCanceledException)
           {
               WriteLine($"{i}: Operation Canceled");
           }
       }
      

      【讨论】:

      • 基于DateTimeOffset.UtcNow的调度意味着这个方案依赖于系统时钟。系统级时钟调整可能会以不可预知的方式中断调度。在await Task.Delay(delay, cancellationToken) 阶段取消WaitWaitAsync 时会发生什么情况?之前安排的操作是否会重新安排以利用已释放的插槽?
      • 关于系统时间,如果将时间推到窗外,则重新开始计数。如果时间显着缩短,在下一个窗口之前可能会有很长的延迟 - 一个窗口中允许的呼叫数量越多,这就越不可能成为问题。 DateTimeOffset 使用 UTC 进行比较,因此不必担心夏令时之类的问题。并非不可能,但总的来说,我认为服务器不会显着改变他们的时间。
      • 关于取消 - 这是基于“等待队列”的实现。如果取消令牌发出信号,则计数和时间窗口保持不变,并处理下一个调用者。我认为这是调用者所期望的。
      猜你喜欢
      • 2012-11-17
      • 1970-01-01
      • 1970-01-01
      • 2020-12-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-05-11
      • 1970-01-01
      相关资源
      最近更新 更多