【问题标题】:C# Task thread pool - Running 100 tasks across only 10 threads [duplicate]C# 任务线程池 - 仅跨 10 个线程运行 100 个任务 [重复]
【发布时间】:2016-06-14 12:41:08
【问题描述】:

我只是想知道是否有人可以为我指出关于 async/await 框架和线程池的正确方向?

基本上,我要做的是在单独的线程/异步中执行 x 次操作,但最多跨 y 个线程。

例如,假设我有 100 个数据库操作: await _repository.WriteData(someData);

我想做的是有一些方法一次运行 10 个这些操作(理想情况下,每个都在一个单独的线程中,所以 10 个线程),并且随着每个操作完成,下一个在然后变得可用的线程。然后我们等待所有操作完成,所有线程完成......

这是无需付出太多努力或增加大量复杂性即可轻松实现的目标吗?

【问题讨论】:

  • 为什么不直接使用 await 让框架为你处理线程呢?您是否进行过任何性能测试表明该框架不适合您的需求?
  • @gabriel 框架不知道最佳 IO 并行度。它怎么会知道?
  • @Eser,我同意它是重复的,但是给这个答案的答案已经质量更高了,所以这看起来很有希望
  • 等等,我很困惑。如果它们是 IO 操作,那么为什么会有 any 线程?您不会为收到的每封信雇用一名工人负责在邮箱中等待该信。为什么要雇佣一堆线程来坐等数据库?
  • 不,它没有,但在这种情况下,该示例用于数据库操作,数据库服务器将负责处理并行 IO 操作。这实际上取决于他没有指定的用例。

标签: c# multithreading async-await


【解决方案1】:

我认为您将注意力集中在线程上,尤其是对于不需要线程来执行的异步操作时,会忽略这一点。

.NET 有一个很棒的 ThreadPool 你可以使用。你不知道里面有多少线程,你不在乎。它只是工作(直到它不起作用并且您需要自己配置它,但这是非常先进的)。

ThreadPool 上运行任务非常简单。为每个操作创建一个任务并使用SemaphoreSlim 限制它们,或者使用现成的 TPL 数据流块。例如:

var block = new ActionBlock<SomeData>(
    _ => _repository.WriteDataAsync(_), // What to do on each item
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }); // How many items at the same time

foreach (var item in items)
{
    block.Post(item); // Post all items to the block
}

block.Complete(); // Signal completion
await block.Completion; // Asynchronously wait for completion.

但是,如果您确实计划创建“专用”线程,则可以使用 Task.Factory.StartNewLongRunning 选项,在 ThreadPool 之外创建专用线程。但请记住,异步操作在整个操作过程中不会维护相同的线程,因为异步操作不需要线程。所以从一个专门的线程开始可能是没有意义的(更多关于我的博客:LongRunning Is Useless For Task.Run With async-await

【讨论】:

  • TPL 数据流非常适合上述问题。不知道为什么有人会对此投反对票。
  • TPL 数据流不是“内置”的。我从你的博客中了解到,哈哈
  • @i3arnon 如何在 10 个任务中的任何一个任务完成时将新任务添加到队列中。表示将新任务添加到队列中。在这里,我的目标是,例如,如果我们已经通过 SemaphoreSlim 或 MaxDegreeOfParallelism 设置了一次运行 10 个任务的限制,但我不想创建 100 个任务,然后通过 SemaphoreSlim 或 MaxDegreeOfParallelism 设置限制并控制它们在 a单次。 ,我只想在10个任务中的任何一个任务完成后创建一个新任务,这个过程将无限继续。
【解决方案2】:

@i3arnon 的回答是正确的。使用 TPL 数据流。

此答案的其余部分仅用于教育目的和/或特殊用例。

我最近在一个项目中遇到了类似的问题,我无法引入任何外部依赖项,所以我不得不推出自己的负载平衡实现,结果出奇地简单(直到你开始取消接线和有序的结果 - 但这超出了这个问题的范围)。

我忽略了“10 个专用线程”的要求,因为正如其他人已经解释的那样,在处理异步操作时它没有意义。相反,我将维护最多 N 并发 Task 实例来处理工作负载。

static async Task InvokeAsync(IEnumerable<Func<Task>> taskFactories, int maxDegreeOfParallelism)
{
    Queue<Func<Task>> queue = new Queue<Func<Task>>(taskFactories);

    if (queue.Count == 0) {
        return;
    }

    List<Task> tasksInFlight = new List<Task>(maxDegreeOfParallelism);

    do
    {
        while (tasksInFlight.Count < maxDegreeOfParallelism && queue.Count != 0)
        {
            Func<Task> taskFactory = queue.Dequeue();

            tasksInFlight.Add(taskFactory());
        }

        Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);

        // Propagate exceptions. In-flight tasks will be abandoned if this throws.
        await completedTask.ConfigureAwait(false);

        tasksInFlight.Remove(completedTask);
    }
    while (queue.Count != 0 || tasksInFlight.Count != 0);
}

用法:

Func<Task>[] taskFactories = {
    () => _repository.WriteData(someData1),
    () => _repository.WriteData(someData2),
    () => _repository.WriteData(someData3),
    () => _repository.WriteData(someData4)
};

await InvokeAsync(taskFactories, maxDegreeOfParallelism: 2);

... 或

IEnumerable<SomeData> someDataCollection = ... // Get data.

await ParallelTasks.InvokeAsync(
    someDataCollection.Select(someData => new Func<Task>(() => _repository.WriteData(someData))),
    maxDegreeOfParallelism: 10
);

此解决方案不会遇到负载平衡不佳的问题,这种问题在其他简单的实现中经常出现在任务具有不同持续时间并且输入已预先分区(例如this one)的情况下。

具有性能优化和参数验证的版本:Gist

【讨论】:

  • 不错的解决方案,但有两个建议: 1. 使用LinkedList&lt;Task&gt; 跟踪执行中的任务,因为它的插入/删除是 O(1),而不涉及任何内存移动。 2. 如果一个任务在循环之外抛出 catch 它,然后在最终的运行列表中 WaitAll 将所有异常收集到一个 AggregateException 中。这样您就可以考虑所有任务及其最终状态,而不是让它们可能比您的 InvokeAsync 寿命更长。
猜你喜欢
  • 2012-09-03
  • 2010-12-18
  • 2012-02-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-04-12
  • 2015-07-13
  • 1970-01-01
相关资源
最近更新 更多