【问题标题】:awaitable Task based queue等待基于任务的队列
【发布时间】:2011-12-13 09:30:30
【问题描述】:

我想知道是否存在ConcurrentQueue 的实现/包装器,类似于BlockingCollection,其中从集合中获取不会阻塞,而是异步的,并且会导致异步等待,直到将项目放入排队。

我提出了我自己的实现,但它似乎没有按预期执行。我想知道我是否正在重新发明已经存在的东西。

这是我的实现:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}

【问题讨论】:

  • 根据我们的 on-topic 指导,“有些问题仍然是题外话,即使它们属于上面列出的类别之一:...向我们提问的问题推荐或查找书籍、工具、软件库、教程或其他场外资源是题外话..."
  • 我最近也想到了一个可以等待的队列(这是我的问题:stackoverflow.com/questions/52775484/…)!我相信它会解决微服务架构中的许多问题!但在这种情况下,队列可能应该是一个持久队列,而不是内存中的东西。

标签: c# asynchronous queue async-await .net-4.5


【解决方案1】:

这对于您的用例来说可能是多余的(考虑到学习曲线),但Reactive Extentions 提供了您可能想要的异步组合的所有粘合剂。

您基本上订阅了更改,它们会在可用时推送给您,您可以让系统将更改推送到单独的线程上。

【讨论】:

  • 我至少部分精通 Reactive,但在生产中使用它有点深奥,因为其他人可能必须维护代码。我真的在挖掘 async/await 为以前非常复杂的服务器产品带来的简单性,并且我试图将所有异步技术保持在单一技术之下。
【解决方案2】:

您可以只使用BlockingCollection(使用默认的ConcurrentQueue)并将对Take 的调用包装在Task 中,这样您就可以await 它:

var bc = new BlockingCollection<T>();

T element = await Task.Run( () => bc.Take() );

【讨论】:

  • 好主意,但我对阻塞不满意。我将有几千个客户端,每个客户端都有自己的消息队列。任何阻塞都会使船沉没,因为它会束缚无所事事的线程。我想要一个可等待的非阻塞任务的原因是我可以将所有操作保留在线程池中而不会导致线程池饥饿。
【解决方案3】:

这是我目前正在使用的实现。

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();
    object queueSyncLock = new object();
    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> DequeueAsync(CancellationToken ct)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        ct.Register(() =>
        {
            lock (queueSyncLock)
            {
                tcs.TrySetCanceled();
            }
        });
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs = null;
        T firstItem = default(T);
        lock (queueSyncLock)
        {
            while (true)
            {
                if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem))
                {
                    waitingQueue.TryDequeue(out tcs);
                    if (tcs.Task.IsCanceled)
                    {
                        continue;
                    }
                    queue.TryDequeue(out firstItem);
                }
                else
                {
                    break;
                }
                tcs.SetResult(firstItem);
            }
        }
    }
}

它工作得很好,但在queueSyncLock 上存在很多争论,因为我大量使用CancellationToken 来取消一些等待任务。当然,这会大大减少我使用BlockingCollection 时看到的阻塞,但是......

我想知道是否有一种更流畅、无锁的方法来达到同样的目的

【讨论】:

    【解决方案4】:

    我不知道无锁解决方案,但你可以看看新的Dataflow library,它是Async CTP 的一部分。一个简单的BufferBlock&lt;T&gt; 就足够了,例如:

    BufferBlock<int> buffer = new BufferBlock<int>();
    

    生产和消费最容易通过数据流块类型的扩展方法完成。

    制作很简单:

    buffer.Post(13);
    

    并且消费是异步就绪的:

    int item = await buffer.ReceiveAsync();
    

    如果可能,我建议您使用 Dataflow;制作这样一个既有效又正确的缓冲区比最初看起来要困难得多。

    【讨论】:

    • 这看起来很有希望......明天会检查一下。谢谢。它看起来很像 CCR 端口。
    • 睡前偷看一眼!看起来 Dataflow 非常适合我的需求。它似乎弥合了 TPL 提供的内容和 CCR 提供的内容之间的差距(我曾经非常成功)。 CCR 的出色工作没有被浪费,这让我感到很积极。这是正确的答案(还有一些闪亮的新东西让我咬牙切齿!)谢谢@StephenCleary。
    • Stephen Cleary 自己的 Nito.AsyncEx 库也有 AsyncProducerConsumerQueue<T>,这是 BufferBlock&lt;T&gt; 的替代品。
    • @Fanblade:没错,但这些天我将人们指向System.Threading.Channels。渠道是一种非常高效且非常现代的解决方案。
    【解决方案5】:

    我的尝试(它在创建“承诺”时引发了一个事件,外部生产者可以使用它来了解何时生产更多项目):

    public class AsyncQueue<T>
    {
        private ConcurrentQueue<T> _bufferQueue;
        private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue;
        private object _syncRoot = new object();
    
        public AsyncQueue()
        {
            _bufferQueue = new ConcurrentQueue<T>();
            _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>();
        }
    
        /// <summary>
        /// Enqueues the specified item.
        /// </summary>
        /// <param name="item">The item.</param>
        public void Enqueue(T item)
        {
            TaskCompletionSource<T> promise;
            do
            {
                if (_promisesQueue.TryDequeue(out promise) &&
                    !promise.Task.IsCanceled &&
                    promise.TrySetResult(item))
                {
                    return;                                       
                }
            }
            while (promise != null);
    
            lock (_syncRoot)
            {
                if (_promisesQueue.TryDequeue(out promise) &&
                    !promise.Task.IsCanceled &&
                    promise.TrySetResult(item))
                {
                    return;
                }
    
                _bufferQueue.Enqueue(item);
            }            
        }
    
        /// <summary>
        /// Dequeues the asynchronous.
        /// </summary>
        /// <param name="cancellationToken">The cancellation token.</param>
        /// <returns></returns>
        public Task<T> DequeueAsync(CancellationToken cancellationToken)
        {
            T item;
    
            if (!_bufferQueue.TryDequeue(out item))
            {
                lock (_syncRoot)
                {
                    if (!_bufferQueue.TryDequeue(out item))
                    {
                        var promise = new TaskCompletionSource<T>();
                        cancellationToken.Register(() => promise.TrySetCanceled());
    
                        _promisesQueue.Enqueue(promise);
                        this.PromiseAdded.RaiseEvent(this, EventArgs.Empty);
    
                        return promise.Task;
                    }
                }
            }
    
            return Task.FromResult(item);
        }
    
        /// <summary>
        /// Gets a value indicating whether this instance has promises.
        /// </summary>
        /// <value>
        /// <c>true</c> if this instance has promises; otherwise, <c>false</c>.
        /// </value>
        public bool HasPromises
        {
            get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; }
        }
    
        /// <summary>
        /// Occurs when a new promise
        /// is generated by the queue
        /// </summary>
        public event EventHandler PromiseAdded;
    }
    

    【讨论】:

    • 我认为这是最好的解决方案。我已经实现了这一点并对其进行了广泛的测试。一些注意事项:对 !promise.Task.IsCanceled 的调用是不必要的。我添加了一个 ManualResetEventSlim 来跟踪 bufferQueue 何时为空,以便调用者可以阻塞等待队列为空。
    • should be disposing CancellationTokenRegistration 你是从 cancellationToken.Register 电话中得到的。
    【解决方案6】:

    使用 C# 8.0 IAsyncEnumerableDataflow library 的简单方法

    // Instatiate an async queue
    var queue = new AsyncQueue<int>();
    
    // Then, loop through the elements of queue.
    // This loop won't stop until it is canceled or broken out of
    // (for that, use queue.WithCancellation(..) or break;)
    await foreach(int i in queue) {
        // Writes a line as soon as some other Task calls queue.Enqueue(..)
        Console.WriteLine(i);
    }
    

    AsyncQueue 的实现如下:

    public class AsyncQueue<T> : IAsyncEnumerable<T>
    {
        private readonly SemaphoreSlim _enumerationSemaphore = new SemaphoreSlim(1);
        private readonly BufferBlock<T> _bufferBlock = new BufferBlock<T>();
    
        public void Enqueue(T item) =>
            _bufferBlock.Post(item);
    
        public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token = default)
        {
            // We lock this so we only ever enumerate once at a time.
            // That way we ensure all items are returned in a continuous
            // fashion with no 'holes' in the data when two foreach compete.
            await _enumerationSemaphore.WaitAsync();
            try {
                // Return new elements until cancellationToken is triggered.
                while (true) {
                    // Make sure to throw on cancellation so the Task will transfer into a canceled state
                    token.ThrowIfCancellationRequested();
                    yield return await _bufferBlock.ReceiveAsync(token);
                }
            } finally {
                _enumerationSemaphore.Release();
            }
    
        }
    }
    

    【讨论】:

    • 我喜欢旧问题得到现代更新。投赞成票。我还没有检查过IAsyncEnumerable,但我非常熟悉javascript的Symbol.asyncIterator,它看起来或多或少是相同的概念。
    • 谢谢@spender!我想是的,它基本上是一个IEnumerable,但你可以异步等待新项目,所以它是一个非阻塞操作。
    • 我想知道,使用SemaphoreSlim(1) 而不是lock 有什么具体原因吗?
    • @valori 在lock 中不能有await
    • 而add-on只是为了获取队列的当前进度,在AsyncQueue.cs中添加一个属性 class public int Count { get { return _bufferBlock.Count; } } 使用这个计数,我们可以在 foreach 循环中检查队列是否为空: await foreach(int i in queue) { if(queue.Count > 1) { // 队列不为空 } else { // queue 为空 } // 其他任务调用 queue.Enqueue(..) Console.WriteLine(i); }
    【解决方案7】:

    查看https://github.com/somdoron/AsyncCollection,您既可以异步出列,也可以使用 C# 8.0 IAsyncEnumerable。

    API 与 BlockingCollection 非常相似。

    AsyncCollection<int> collection = new AsyncCollection<int>();
    
    var t = Task.Run(async () =>
    {
        while (!collection.IsCompleted)
        {
            var item = await collection.TakeAsync();
    
            // process
        }
    });
    
    for (int i = 0; i < 1000; i++)
    {
        collection.Add(i);
    }
    
    collection.CompleteAdding();
    
    t.Wait();
    

    使用 IAsyncEnumeable:

    AsyncCollection<int> collection = new AsyncCollection<int>();
    
    var t = Task.Run(async () =>
    {
        await foreach (var item in collection)
        {
            // process
        }
    });
    
    for (int i = 0; i < 1000; i++)
    {
        collection.Add(i);
    }
    
    collection.CompleteAdding();
    
    t.Wait();
    

    【讨论】:

    • 您的示例var item = await collection.TakeAsync() 似乎仅适用于单个消费者。如果有多个消费者,您可能会收到InvalidOperationExceptions。我认为您应该使用TryTakeAsync 而不是TakeAsync,以使其也能与多个消费者一起正常工作。
    【解决方案8】:

    实现这一点的一种简单易行的方法是使用SemaphoreSlim

    public class AwaitableQueue<T>
    {
        private SemaphoreSlim semaphore = new SemaphoreSlim(0);
        private readonly object queueLock = new object();
        private Queue<T> queue = new Queue<T>();
    
        public void Enqueue(T item)
        {
            lock (queueLock)
            {
                queue.Enqueue(item);
                semaphore.Release();
            }
        }
    
        public T WaitAndDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
        {
            semaphore.Wait(timeSpan, cancellationToken);
            lock (queueLock)
            {
                return queue.Dequeue();
            }
        }
    
        public async Task<T> WhenDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
        {
            await semaphore.WaitAsync(timeSpan, cancellationToken);
            lock (queueLock)
            {
                return queue.Dequeue();
            }
        }
    }
    

    其美妙之处在于SemaphoreSlim 处理了实现Wait()WaitAsync() 功能的所有复杂性。缺点是队列长度由信号量队列本身跟踪,它们都神奇地保持同步。

    【讨论】:

    • 很好,如果性能不是最重要的,则不会出现入队或出队的爆发,并且处理每个项目的时间很重要。它使用锁定,这意味着该集合一次只能被一个线程访问,所有其他线程在入队或出队时都将等待阻塞。
    • 应该考虑semaphore.WaitAsync()的结果,如果超时返回null,默认值或者抛出异常。
    • @GuillermoPrandi semaphore.WaitAsync 任务不返回值。如果超时,它会抛出一个TaskCanceledException,它会冒泡。
    • @Ryan docs.microsoft.com/en-us/dotnet/api/… "如果当前线程成功进入 SemaphoreSlim 则返回 true 的任务,否则返回 false。"跨度>
    • 你如何使用它来排队等待返回的工作列表?
    【解决方案9】:

    8 年后,我遇到了这个问题,即将实现在 nuget 包/命名空间中找到的 MS AsyncQueue&lt;T&gt; 类:Microsoft.VisualStudio.Threading

    感谢@Theodor Zoulias 提到此 api 可能已过时,DataFlow 库将是一个不错的选择。

    所以我编辑了我的 AsyncQueue 实现以使用 BufferBlock。几乎相同,但效果更好。

    我在 AspNet Core 后台线程中使用它,它完全异步运行。

    protected async Task MyRun()
    {
        BufferBlock<MyObj> queue = new BufferBlock<MyObj>();
        Task enqueueTask = StartDataIteration(queue);
    
        while (await queue.OutputAvailableAsync())
        {
            var myObj = queue.Receive();
            // do something with myObj
        }
    
    }
    
    public async Task StartDataIteration(BufferBlock<MyObj> queue)
    {
        var cursor = await RunQuery();
        while(await cursor.Next()) { 
            queue.Post(cursor.Current);
        }
        queue.Complete(); // <<< signals the consumer when queue.Count reaches 0
    }
    

    我发现使用 queue.OutputAvailableAsync() 解决了我在使用 AsyncQueue 时遇到的问题——试图确定队列何时完成,而不必检查出队任务。

    【讨论】:

    • Task.WhenAny 等待queue.DequeueAsync()queue.Completion 是一个聪明的技巧,但它感觉就像是克服糟糕API 设计缺点的技巧。替代类(Dataflow BufferBlock&lt;T&gt;Channel&lt;T&gt;)提供方法(分别为 OutputAvailableAsyncWaitToReadAsync),允许等待更多元素,而无需将异常作为反馈机制处理。您的技巧的问题在于,您最终可能会遇到未观察到异常的错误任务,在这种情况下会触发 TaskScheduler.UnobservedTaskException 事件。
    • 类中还有其他通知方式 - 但 MS 在文档中没有示例。就我而言,我有多个任务要等待,所以无论如何我不得不使用WhenAny。 - 如果一个任务抛出它可以作为一个 AggregateException 被捕获。
    • 我的观点是 Microsoft.VisualStudio.Threading.AsyncQueue&lt;T&gt; 类不应该用于新项目,因为今天有更好的替代品。尤其是 Channel&lt;T&gt; 类,它不仅提供了更好的 API,而且还具有出色的性能特征。
    • 好的,你说得对,AsyncQueue 基于 TPL 库,看起来是为在 Visual Studio 扩展中工作而设计的。我会用我的实现来编辑我的答案。谢谢你的评论,你可能让我头疼了一大堆。
    • 在您的新实现(基于BufferBlock 的实现)中存在潜在的竞争条件,如果您有多个消费者,可能会出现这种情况。 Receive 方法可以在另一个消费者从队列中取出最后一项之后被一个消费者调用。出于这个原因,最好使用TryReceive 方法作为ifwhile 块中的条件,这样您就不必在以后更新架构时查看使用代码.以here 为例。
    【解决方案10】:

    现在有一种官方方法可以做到这一点:System.Threading.Channels。它内置于 .NET Core 3.0 及更高版本(包括 .NET 5.0 和 6.0)的核心运行时中,但也可作为 .NET Standard 2.0 和 2.1 上的 NuGet 包使用。您可以阅读文档here

    var channel = System.Threading.Channels.Channel.CreateUnbounded<int>();
    

    排队工作:

    // This will succeed and finish synchronously if the channel is unbounded.
    channel.Writer.TryWrite(42);
    

    完成频道:

    channel.Writer.TryComplete();
    

    从频道读取:

    var i = await channel.Reader.ReadAsync();
    

    或者,如果您拥有 .NET Core 3.0 或更高版本:

    await foreach (int i in channel.Reader.ReadAllAsync())
    {
        // whatever processing on i...
    }
    

    【讨论】:

    • 下次我有机会使用这种结构时,我会检查一下...如果成功,我会给你一个绿色的勾。 ...Writer.WaitToWriteAsync() 在有界队列上看起来也超级方便。很好的发现...感谢您添加此内容。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-01-21
    • 1970-01-01
    • 2013-07-11
    • 2023-03-19
    • 1970-01-01
    • 1970-01-01
    • 2012-09-01
    相关资源
    最近更新 更多