这是一个异步和同步的节流实现,它可以限制每个持续时间内对方法的调用次数。它基于当前时间与 DateTimeOffset 和 Task.Delay/Thread.Sleep 的简单比较。它应该适用于许多不需要高度时间分辨率的实现,并且应该在您想要限制的方法之前调用。
此解决方案允许用户指定每个持续时间允许的呼叫次数(默认为每个时间段 1 次呼叫)。这可以让您的节流阀尽可能地“突发”,但代价是无法控制调用者何时可以继续,或者调用可以尽可能均匀地间隔。
假设我们的目标是 300 次调用/分钟:您可以设置一个持续时间为 200 毫秒的常规节流阀,该节流阀将平均分布每个调用,其间至少间隔 200 毫秒,或者您可以创建一个节流阀将允许每秒进行 5 次调用,而不考虑它们的间隔(前 5 次调用获胜 - 可能一次全部调用!)。两者都将速率限制保持在 300calls/min 以下,但前者处于平均分离的极端,后者更“突发”。在循环中处理项目时,将事物均匀分布是很好的,但对于并行运行的事物(如 Web 请求)可能不是那么好,因为调用时间不可预测,不必要的延迟实际上可能会降低吞吐量。同样,您的用例和测试必须成为您最佳的指南。
这个类是线程安全的,你需要在某个地方保存对它的实例的引用,以便需要共享它的对象实例可以访问。对于作为应用程序实例上的字段的 ASP.NET Web 应用程序,可以是网页/控制器上的静态字段,从您选择的 DI 容器作为单例注入,或者您可以访问共享的任何其他方式特定场景中的实例。
编辑:更新以确保延迟永远不会超过持续时间。
public class Throttle
{
/// <summary>
/// How maximum time to delay access.
/// </summary>
private readonly TimeSpan _duration;
/// <summary>
/// The next time to run.
/// </summary>
private DateTimeOffset _next = DateTimeOffset.MinValue;
/// <summary>
/// Synchronize access to the throttle gate.
/// </summary>
private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1, 1);
/// <summary>
/// Number of allowed callers per time window.
/// </summary>
private readonly int _numAllowed = 1;
/// <summary>
/// The number of calls in the current time window.
/// </summary>
private int _count;
/// <summary>
/// The amount of time per window.
/// </summary>
public TimeSpan Duration => _duration;
/// <summary>
/// The number of calls per time period.
/// </summary>
public int Size => _numAllowed;
/// <summary>
/// Crates a Throttle that will allow one caller per duration.
/// </summary>
/// <param name="duration">The amount of time that must pass between calls.</param>
public Throttle(TimeSpan duration)
{
if (duration.Ticks <= 0)
throw new ArgumentOutOfRangeException(nameof(duration));
_duration = duration;
}
/// <summary>
/// Creates a Throttle that will allow the given number of callers per time period.
/// </summary>
/// <param name="num">The number of calls to allow per time period.</param>
/// <param name="per">The duration of the time period.</param>
public Throttle(int num, TimeSpan per)
{
if (num <= 0 || per.Ticks <= 0)
throw new ArgumentOutOfRangeException();
_numAllowed = num;
_duration = per;
}
/// <summary>
/// Returns a task that will complete when the caller may continue.
/// </summary>
/// <remarks>This method can be used to synchronize access to a resource at regular intervals
/// with no more frequency than specified by the duration,
/// and should be called BEFORE accessing the resource.</remarks>
/// <param name="cancellationToken">A cancellation token that may be used to abort the stop operation.</param>
/// <returns>The number of actors that have been allowed within the current time window.</returns>
public async Task<int> WaitAsync(CancellationToken cancellationToken = default(CancellationToken))
{
await _mutex.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
var delay = _next - DateTimeOffset.UtcNow;
// ensure delay is never longer than the duration
if (delay > _duration)
delay = _duration;
// continue immediately based on count
if (_count < _numAllowed)
{
_count++;
if (delay.Ticks <= 0) // past time window, reset
{
_next = DateTimeOffset.UtcNow.Add(_duration);
_count = 1;
}
return _count;
}
// over the allowed count within the window
if (delay.Ticks > 0)
{
// delay until the next window
await Task.Delay(delay, cancellationToken)
.ConfigureAwait(false);
}
_next = DateTimeOffset.UtcNow.Add(_duration);
_count = 1;
return _count;
}
finally
{
_mutex.Release();
}
}
/// <summary>
/// Returns a task that will complete when the caller may continue.
/// </summary>
/// <remarks>This method can be used to synchronize access to a resource at regular intervals
/// with no more frequency than specified by the duration,
/// and should be called BEFORE accessing the resource.</remarks>
/// <param name="cancellationToken">A cancellation token that may be used to abort the stop operation.</param>
/// <returns>The number of actors that have been allowed within the current time window.</returns>
public int Wait(CancellationToken cancellationToken = default(CancellationToken))
{
_mutex.Wait(cancellationToken);
try
{
var delay = _next - DateTimeOffset.UtcNow;
// ensure delay is never larger than the duration.
if (delay > _duration)
delay = _duration;
// continue immediately based on count
if (_count < _numAllowed)
{
_count++;
if (delay.Ticks <= 0) // past time window, reset
{
_next = DateTimeOffset.UtcNow.Add(_duration);
_count = 1;
}
return _count;
}
// over the allowed count within the window
if (delay.Ticks > 0)
{
// delay until the next window
Thread.Sleep(delay);
}
_next = DateTimeOffset.UtcNow.Add(_duration);
_count = 1;
return _count;
}
finally
{
_mutex.Release();
}
}
}
此示例显示了如何在循环中同步使用节流阀,以及取消的行为方式。如果您将其想象为人们排队乘车,那么如果发出取消标记信号,就好像该人越界而其他人向前移动。
var t = new Throttle(5, per: TimeSpan.FromSeconds(1));
var c = new CancellationTokenSource(TimeSpan.FromSeconds(22));
foreach(var i in Enumerable.Range(1,300)) {
var ct = i > 250
? default(CancellationToken)
: c.Token;
try
{
var n = await t.WaitAsync(ct).ConfigureAwait(false);
WriteLine($"{i}: [{n}] {DateTime.Now}");
}
catch (OperationCanceledException)
{
WriteLine($"{i}: Operation Canceled");
}
}