这是一个可以优先获取的类PrioritySemaphore<TPriority>。在内部,它基于SortedSet 集合。
public class PrioritySemaphore<TPriority>
{
private readonly PriorityQueue _priorityQueue;
private readonly object _locker = new object();
private readonly int _maxCount;
private int _currentCount;
private long _indexSeed = 0;
public PrioritySemaphore(int initialCount, int maxCount,
IComparer<TPriority> comparer = null)
{
if (initialCount < 0)
throw new ArgumentOutOfRangeException(nameof(initialCount));
if (maxCount <= 0) throw new ArgumentOutOfRangeException(nameof(maxCount));
_priorityQueue = new PriorityQueue(comparer);
_currentCount = initialCount;
_maxCount = maxCount;
}
public PrioritySemaphore(int initialCount, IComparer<TPriority> comparer = null)
: this(initialCount, Int32.MaxValue, comparer) { }
public PrioritySemaphore(IComparer<TPriority> comparer = null)
: this(0, Int32.MaxValue, comparer) { }
public int CurrentCount { get { lock (_locker) return _currentCount; } }
public async Task<bool> WaitAsync(TPriority priority, int millisecondsTimeout,
CancellationToken cancellationToken = default)
{
if (millisecondsTimeout < -1)
throw new ArgumentOutOfRangeException(nameof(millisecondsTimeout));
cancellationToken.ThrowIfCancellationRequested();
lock (_locker)
{
if (_currentCount > 0)
{
_currentCount--;
return true;
}
}
if (millisecondsTimeout == 0) return false;
var tcs = new TaskCompletionSource<bool>(
TaskCreationOptions.RunContinuationsAsynchronously);
long entryIndex = -1;
bool taskCompleted = false;
Timer timer = null;
if (millisecondsTimeout > 0)
{
timer = new Timer(_ =>
{
bool doComplete;
lock (_locker)
{
doComplete = entryIndex == -1
|| _priorityQueue.Remove(priority, entryIndex);
if (doComplete) taskCompleted = true;
}
if (doComplete) tcs.TrySetResult(false);
}, null, millisecondsTimeout, Timeout.Infinite);
}
CancellationTokenRegistration registration = default;
if (cancellationToken.CanBeCanceled)
{
registration = cancellationToken.Register(() =>
{
bool doComplete;
lock (_locker)
{
doComplete = entryIndex == -1
|| _priorityQueue.Remove(priority, entryIndex);
if (doComplete) taskCompleted = true;
}
if (doComplete) tcs.TrySetCanceled(cancellationToken);
});
}
bool disposeSubscriptions = false;
lock (_locker)
{
if (!taskCompleted)
{
entryIndex = _indexSeed++;
_priorityQueue.Enqueue(priority, entryIndex, tcs, timer, registration);
}
else
{
disposeSubscriptions = true;
}
}
if (disposeSubscriptions)
{
timer?.Dispose();
registration.Dispose();
}
return await tcs.Task.ConfigureAwait(false);
}
public Task WaitAsync(TPriority priority,
CancellationToken cancellationToken = default)
{
return WaitAsync(priority, Timeout.Infinite, cancellationToken);
}
public void Release()
{
TaskCompletionSource<bool> tcs;
Timer timer;
CancellationTokenRegistration registration;
lock (_locker)
{
if (_priorityQueue.IsEmpty)
{
if (_currentCount >= _maxCount) throw new SemaphoreFullException();
_currentCount++;
return;
}
(tcs, timer, registration) = _priorityQueue.Dequeue();
}
tcs.TrySetResult(true);
timer?.Dispose();
registration.Dispose();
}
private class PriorityQueue : IComparer<(TPriority Priority, long Index,
TaskCompletionSource<bool>, Timer, CancellationTokenRegistration)>
{
private readonly SortedSet<(TPriority Priority, long Index,
TaskCompletionSource<bool> TCS, Timer Timer,
CancellationTokenRegistration Registration)> _sortedSet;
private readonly IComparer<TPriority> _priorityComparer;
private readonly Comparer<long> _indexComparer = Comparer<long>.Default;
public PriorityQueue(IComparer<TPriority> comparer)
{
_priorityComparer = comparer ?? Comparer<TPriority>.Default;
_sortedSet = new SortedSet<(TPriority Priority, long Index,
TaskCompletionSource<bool> TCS, Timer Timer,
CancellationTokenRegistration Registration)>(this);
}
public bool IsEmpty => _sortedSet.Count == 0;
public void Enqueue(TPriority priority, long index,
TaskCompletionSource<bool> tcs, Timer timer,
CancellationTokenRegistration registration)
{
_sortedSet.Add((priority, index, tcs, timer, registration));
}
public (TaskCompletionSource<bool>, Timer, CancellationTokenRegistration)
Dequeue()
{
Debug.Assert(_sortedSet.Count > 0);
var entry = _sortedSet.Min;
_sortedSet.Remove(entry);
return (entry.TCS, entry.Timer, entry.Registration);
}
public bool Remove(TPriority priority, long index)
{
return _sortedSet.Remove((priority, index, default, default, default));
}
public int Compare((TPriority Priority, long Index,
TaskCompletionSource<bool>, Timer, CancellationTokenRegistration) x,
(TPriority Priority, long Index, TaskCompletionSource<bool>, Timer,
CancellationTokenRegistration) y)
{
int result = _priorityComparer.Compare(x.Priority, y.Priority);
if (result == 0) result = _indexComparer.Compare(x.Index, y.Index);
return result;
}
}
}
使用示例:
var semaphore = new PrioritySemaphore<int>();
//...
await semaphore.WaitAsync(priority: 1);
//...
await semaphore.WaitAsync(priority: 2);
//...
semaphore.Release();
在Release之后,信号量会被最高优先级的等待者获取。在上面的示例中,它将是优先级为1 的等待者。较小的值表示较高的优先级。如果有多个具有相同最高优先级的等待者,则信号量将由最先请求的等待者获取(保持先进先出顺序)。
PrioritySemaphore<TPriority> 类只有异步 API。它支持超时等待和CancellationToken 等待,但这些功能尚未经过广泛测试。
注意: .NET 6 引入了PriorityQueue<TElement, TPriority> 类,理论上可以用来简化上述实现。不幸的是,新类不支持从队列中删除特定元素。仅支持出队。为了实现PrioritySemaphore<TPriority> 类的取消和超时功能,需要从队列中移除特定元素。所以新的类不能用在上面的实现中。