【问题标题】:Is there a C# class like Queue that implements IAsyncEnumerable?是否有像 Queue 这样实现 IAsyncEnumerable 的 C# 类?
【发布时间】:2020-02-17 11:08:14
【问题描述】:
Queue 和 ConcurrentQueue 都实现了 IEnumerable 但不是 IAsyncEnumerable。 NuGet 上是否有实现IAsyncEnumerable 的标准类或类,这样,如果队列为空,则MoveNextAsync 的结果在下一个添加到队列之前不会完成?
【问题讨论】:
标签:
c#
asynchronous
queue
iasyncenumerable
【解决方案1】:
如果您使用的是 .NET Core 平台,则至少有两个内置选项:
-
System.Threading.Tasks.Dataflow.BufferBlock<T> 类,TPL Dataflow 库的一部分。它本身没有实现IAsyncEnumerable<T>,但它公开了可等待的OutputAvailableAsync() 方法,实现ToAsyncEnumerable 扩展方法很简单。
-
System.Threading.Channels.Channel<T> 类,Channels 库的核心组件。它通过其公开了一个IAsyncEnumerable<T> 实现
Reader.ReadAllAsync()¹ 方法。
通过安装 nuget 包(每个类不同),这两个类也可用于 .NET Framework。
为BufferBlock<T> 实现IAsyncEnumerable<T>:
public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
this IReceivableSourceBlock<T> source,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
{
while (source.TryReceive(out T item))
{
yield return item;
cancellationToken.ThrowIfCancellationRequested();
}
}
await source.Completion.ConfigureAwait(false); // Propagate possible exception
}
¹(不适用于 .NET Framework,但很容易在 similar way 中实现)