@i3arnon 的回答是正确的。使用 TPL 数据流。
此答案的其余部分仅用于教育目的和/或特殊用例。
我最近在一个项目中遇到了类似的问题,我无法引入任何外部依赖项,所以我不得不推出自己的负载平衡实现,结果出奇地简单(直到你开始取消接线和有序的结果 - 但这超出了这个问题的范围)。
我忽略了“10 个专用线程”的要求,因为正如其他人已经解释的那样,在处理异步操作时它没有意义。相反,我将维护最多 N 并发 Task 实例来处理工作负载。
static async Task InvokeAsync(IEnumerable<Func<Task>> taskFactories, int maxDegreeOfParallelism)
{
Queue<Func<Task>> queue = new Queue<Func<Task>>(taskFactories);
if (queue.Count == 0) {
return;
}
List<Task> tasksInFlight = new List<Task>(maxDegreeOfParallelism);
do
{
while (tasksInFlight.Count < maxDegreeOfParallelism && queue.Count != 0)
{
Func<Task> taskFactory = queue.Dequeue();
tasksInFlight.Add(taskFactory());
}
Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
// Propagate exceptions. In-flight tasks will be abandoned if this throws.
await completedTask.ConfigureAwait(false);
tasksInFlight.Remove(completedTask);
}
while (queue.Count != 0 || tasksInFlight.Count != 0);
}
用法:
Func<Task>[] taskFactories = {
() => _repository.WriteData(someData1),
() => _repository.WriteData(someData2),
() => _repository.WriteData(someData3),
() => _repository.WriteData(someData4)
};
await InvokeAsync(taskFactories, maxDegreeOfParallelism: 2);
... 或
IEnumerable<SomeData> someDataCollection = ... // Get data.
await ParallelTasks.InvokeAsync(
someDataCollection.Select(someData => new Func<Task>(() => _repository.WriteData(someData))),
maxDegreeOfParallelism: 10
);
此解决方案不会遇到负载平衡不佳的问题,这种问题在其他简单的实现中经常出现在任务具有不同持续时间并且输入已预先分区(例如this one)的情况下。
具有性能优化和参数验证的版本:Gist。