【问题标题】:SemaphoreSlim Await PrioritySemaphoreSlim 等待优先级
【发布时间】:2016-09-13 15:58:23
【问题描述】:

我想知道SemaphoreSlim 在调用 Await 时是否有类似优先级的东西。

我找不到任何东西,但也许有人以前做过类似的事情。

这个想法是,如果我需要,稍后可以在信号量上以更高的优先级调用等待,它会允许等待首先返回。

【问题讨论】:

  • 没有优先级。无论哪匹马先到达终点线,都将赢得比赛。在极不可能的情况下是平局(不太可能是因为您不使用 WaitAll),操作系统会故意让获胜者随机,这是针对锁定车队的对策。 joeduffyblog.com/2006/12/14/…
  • 谢谢。我想可能是这样的。我可能会尝试写一些东西来处理我想要的东西,但我认为这不是一个好主意。也许碰巧其他人已经做了一些事情,但我正在考虑重做一些代码。

标签: c# .net async-await semaphore


【解决方案1】:

不,SemaphoreSlim 中没有优先级,无论您使用的是同步锁定还是异步锁定。

很少需要异步锁的优先级。如果您退后一步,放眼大局,通常这类问题会有更优雅的解决方案。

【讨论】:

    【解决方案2】:

    这是一个可以优先获取的类PrioritySemaphore<TPriority>。在内部,它基于SortedSet 集合。

    public class PrioritySemaphore<TPriority>
    {
        private readonly PriorityQueue _priorityQueue;
        private readonly object _locker = new object();
        private readonly int _maxCount;
        private int _currentCount;
        private long _indexSeed = 0;
    
        public PrioritySemaphore(int initialCount, int maxCount,
            IComparer<TPriority> comparer = null)
        {
            if (initialCount < 0)
                throw new ArgumentOutOfRangeException(nameof(initialCount));
            if (maxCount <= 0) throw new ArgumentOutOfRangeException(nameof(maxCount));
    
            _priorityQueue = new PriorityQueue(comparer);
            _currentCount = initialCount;
            _maxCount = maxCount;
        }
        public PrioritySemaphore(int initialCount, IComparer<TPriority> comparer = null)
            : this(initialCount, Int32.MaxValue, comparer) { }
        public PrioritySemaphore(IComparer<TPriority> comparer = null)
            : this(0, Int32.MaxValue, comparer) { }
    
        public int CurrentCount { get { lock (_locker) return _currentCount; } }
    
        public async Task<bool> WaitAsync(TPriority priority, int millisecondsTimeout,
            CancellationToken cancellationToken = default)
        {
            if (millisecondsTimeout < -1)
                throw new ArgumentOutOfRangeException(nameof(millisecondsTimeout));
    
            cancellationToken.ThrowIfCancellationRequested();
            lock (_locker)
            {
                if (_currentCount > 0)
                {
                    _currentCount--;
                    return true;
                }
            }
            if (millisecondsTimeout == 0) return false;
            var tcs = new TaskCompletionSource<bool>(
                TaskCreationOptions.RunContinuationsAsynchronously);
            long entryIndex = -1;
            bool taskCompleted = false;
    
            Timer timer = null;
            if (millisecondsTimeout > 0)
            {
                timer = new Timer(_ =>
                {
                    bool doComplete;
                    lock (_locker)
                    {
                        doComplete = entryIndex == -1
                            || _priorityQueue.Remove(priority, entryIndex);
                        if (doComplete) taskCompleted = true;
                    }
                    if (doComplete) tcs.TrySetResult(false);
                }, null, millisecondsTimeout, Timeout.Infinite);
            }
    
            CancellationTokenRegistration registration = default;
            if (cancellationToken.CanBeCanceled)
            {
                registration = cancellationToken.Register(() =>
                {
                    bool doComplete;
                    lock (_locker)
                    {
                        doComplete = entryIndex == -1
                            || _priorityQueue.Remove(priority, entryIndex);
                        if (doComplete) taskCompleted = true;
                    }
                    if (doComplete) tcs.TrySetCanceled(cancellationToken);
                });
            }
    
            bool disposeSubscriptions = false;
            lock (_locker)
            {
                if (!taskCompleted)
                {
                    entryIndex = _indexSeed++;
                    _priorityQueue.Enqueue(priority, entryIndex, tcs, timer, registration);
                }
                else
                {
                    disposeSubscriptions = true;
                }
            }
            if (disposeSubscriptions)
            {
                timer?.Dispose();
                registration.Dispose();
            }
            return await tcs.Task.ConfigureAwait(false);
        }
    
        public Task WaitAsync(TPriority priority,
            CancellationToken cancellationToken = default)
        {
            return WaitAsync(priority, Timeout.Infinite, cancellationToken);
        }
    
        public void Release()
        {
            TaskCompletionSource<bool> tcs;
            Timer timer;
            CancellationTokenRegistration registration;
            lock (_locker)
            {
                if (_priorityQueue.IsEmpty)
                {
                    if (_currentCount >= _maxCount) throw new SemaphoreFullException();
                    _currentCount++;
                    return;
                }
                (tcs, timer, registration) = _priorityQueue.Dequeue();
            }
            tcs.TrySetResult(true);
            timer?.Dispose();
            registration.Dispose();
        }
    
        private class PriorityQueue : IComparer<(TPriority Priority, long Index,
            TaskCompletionSource<bool>, Timer, CancellationTokenRegistration)>
        {
            private readonly SortedSet<(TPriority Priority, long Index,
                TaskCompletionSource<bool> TCS, Timer Timer,
                CancellationTokenRegistration Registration)> _sortedSet;
    
            private readonly IComparer<TPriority> _priorityComparer;
            private readonly Comparer<long> _indexComparer = Comparer<long>.Default;
    
            public PriorityQueue(IComparer<TPriority> comparer)
            {
                _priorityComparer = comparer ?? Comparer<TPriority>.Default;
                _sortedSet = new SortedSet<(TPriority Priority, long Index,
                TaskCompletionSource<bool> TCS, Timer Timer,
                CancellationTokenRegistration Registration)>(this);
            }
    
            public bool IsEmpty => _sortedSet.Count == 0;
    
            public void Enqueue(TPriority priority, long index,
                TaskCompletionSource<bool> tcs, Timer timer,
                CancellationTokenRegistration registration)
            {
                _sortedSet.Add((priority, index, tcs, timer, registration));
            }
    
            public (TaskCompletionSource<bool>, Timer, CancellationTokenRegistration)
                Dequeue()
            {
                Debug.Assert(_sortedSet.Count > 0);
                var entry = _sortedSet.Min;
                _sortedSet.Remove(entry);
                return (entry.TCS, entry.Timer, entry.Registration);
            }
    
            public bool Remove(TPriority priority, long index)
            {
                return _sortedSet.Remove((priority, index, default, default, default));
            }
    
            public int Compare((TPriority Priority, long Index,
                TaskCompletionSource<bool>, Timer, CancellationTokenRegistration) x,
                (TPriority Priority, long Index, TaskCompletionSource<bool>, Timer,
                CancellationTokenRegistration) y)
            {
                int result = _priorityComparer.Compare(x.Priority, y.Priority);
                if (result == 0) result = _indexComparer.Compare(x.Index, y.Index);
                return result;
            }
        }
    }
    

    使用示例:

    var semaphore = new PrioritySemaphore<int>();
    //...
    await semaphore.WaitAsync(priority: 1);
    //...
    await semaphore.WaitAsync(priority: 2);
    //...
    semaphore.Release();
    

    Release之后,信号量会被最高优先级的等待者获取。在上面的示例中,它将是优先级为1 的等待者。较小的值表示较高的优先级。如果有多个具有相同最高优先级的等待者,则信号量将由最先请求的等待者获取(保持先进先出顺序)。

    PrioritySemaphore&lt;TPriority&gt; 类只有异步 API。它支持超时等待和CancellationToken 等待,但这些功能尚未经过广泛测试。


    注意: .NET 6 引入了PriorityQueue&lt;TElement, TPriority&gt; 类,理论上可以用来简化上述实现。不幸的是,新类不支持从队列中删除特定元素。仅支持出队。为了实现PrioritySemaphore&lt;TPriority&gt; 类的取消和超时功能,需要从队列中移除特定元素。所以新的类不能用在上面的实现中。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-10-07
      • 2015-03-06
      • 1970-01-01
      • 1970-01-01
      • 2022-08-04
      • 2011-06-25
      相关资源
      最近更新 更多