【问题标题】:How to limit the Maximum number of parallel tasks in c#c#中如何限制最大并行任务数
【发布时间】:2016-04-12 05:50:26
【问题描述】:

我有 1000 条输入消息要处理。我正在循环输入集合并开始处理每条消息的新任务。

//Assume this messages collection contains 1000 items
var messages = new List<string>();

foreach (var msg in messages)
{
   Task.Factory.StartNew(() =>
   {
    Process(msg);
   });
 }

我们能否猜测当时同时处理的最大消息数(假设是普通的四核处理器),或者我们能否限制当时要处理的最大消息数?

如何确保此消息以与 Collection 相同的顺序/顺序得到处理?

【问题讨论】:

标签: c# .net asynchronous


【解决方案1】:

您可以使用Parallel.Foreach 并改用MaxDegreeOfParallelism

Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10},
msg =>
{
     // logic
     Process(msg);
});

【讨论】:

  • 这正是 Parallel.ForEach 的处理方式。
  • 而且由于任务并行库是在ThreadPool 上构建的,我们可以假设如果我们没有明确指定它,它只会运行与系统核心一样多的任务。
  • 这会确保消息按照它们在列表中出现的顺序进行处理吗?
  • 我认为 bit 不是在谈论完成,而是在谈论处理顺序。在我的程序中,似乎在使用 Parallel.Foreach 时,列表被分成 n 个集合(MaxDegreeOfParallelism)。所有集合都是并行处理的,并且在每个集合中,执行顺序。
  • 我只是想提醒大家不要将 Parallel.ForEach 用于异步 I/O 绑定任务。它并不是真正为异步操作创建的。它只会启动 X 个线程池线程并在 I/O 等待期间阻塞它们。请改用SemaphoreSlim
【解决方案2】:

在这种情况下,SemaphoreSlim 是一个非常好的解决方案,我强烈建议 OP 尝试一下,但 @Manoj 的答案存在 cmets 中提到的缺陷。在生成这样的任务之前应该等待信号量。

更新答案: 正如@Vasyl 指出的,信号量可能在任务完成之前被释放,并且在调用Release() 方法时会引发异常,因此在退出 using 块之前必须等待所有任务完成创建任务。

int maxConcurrency=10;
var messages = new List<string>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
    List<Task> tasks = new List<Task>();
    foreach(var msg in messages)
    {
        concurrencySemaphore.Wait();

        var t = Task.Factory.StartNew(() =>
        {
            try
            {
                 Process(msg);
            }
            finally
            {
                concurrencySemaphore.Release();
            }
        });

        tasks.Add(t);
    }

    Task.WaitAll(tasks.ToArray());
}

回复评论 对于那些想看看如何在没有Task.WaitAll 的情况下处理信号量的人 在控制台应用程序中运行以下代码,将引发此异常。

System.ObjectDisposedException: '信号量已被释放。'

static void Main(string[] args)
{
    int maxConcurrency = 5;
    List<string> messages =  Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();

    using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    {
        List<Task> tasks = new List<Task>();
        foreach (var msg in messages)
        {
            concurrencySemaphore.Wait();

            var t = Task.Factory.StartNew(() =>
            {
                try
                {
                    Process(msg);
                }
                finally
                {
                    concurrencySemaphore.Release();
                }
            });

            tasks.Add(t);
        }

       // Task.WaitAll(tasks.ToArray());
    }
    Console.WriteLine("Exited using block");
    Console.ReadKey();
}

private static void Process(string msg)
{            
    Thread.Sleep(2000);
    Console.WriteLine(msg);
}

【讨论】:

  • 如果Process方法运行很长时间会怎样? concurrencySemaphore.Release() 可能在 concurrencySemaphore 已经被释放时被调用。结果 - ObjectDisposedException.
  • @VasylZvarydchuk 你是对的。我已经更新了答案
  • 信号量怎么会在所有任务完成之前就被释放?
  • @VasylZvarydchuk - 即使进程运行很长时间,信号量在释放之前如何处理?
  • 不会在当前的 Answer.comment 中处理掉Task.WaitAll 看看自己
【解决方案3】:

我认为使用并行 LINQ 会更好

  Parallel.ForEach(messages ,
     new ParallelOptions{MaxDegreeOfParallelism = 4},
            x => Process(x);
        );

其中 x 是 MaxDegreeOfParallelism

【讨论】:

    【解决方案4】:

    引入了 .NET 5.0Core 3.0 channels
    这种生产者/消费者并发模式的主要好处是您还可以限制输入数据处理以减少资源影响。
    这在处理数百万条数据记录时特别有用。
    您现在可以只连续查询数据块并等待工作人员处理它,然后再查询更多数据,而不是一次将整个数据集读入内存。

    队列容量为 50 条消息和 5 个消费者线程的代码示例:

    /// <exception cref="System.AggregateException">Thrown on Consumer Task exceptions.</exception>
    public static async Task ProcessMessages(List<string> messages)
    {
        const int producerCapacity = 10, consumerTaskLimit = 3;
        var channel = Channel.CreateBounded<string>(producerCapacity);
    
        _ = Task.Run(async () =>
        {
            foreach (var msg in messages)
            {
                await channel.Writer.WriteAsync(msg);
                // blocking when channel is full
                // waiting for the consumer tasks to pop messages from the queue
            }
    
            channel.Writer.Complete();
            // signaling the end of queue so that 
            // WaitToReadAsync will return false to stop the consumer tasks
        });
    
        var tokenSource = new CancellationTokenSource();
        CancellationToken ct = tokenSource.Token;
    
        var consumerTasks = Enumerable
        .Range(1, consumerTaskLimit)
        .Select(_ => Task.Run(async () =>
        {
            try
            {
                while (await channel.Reader.WaitToReadAsync(ct))
                {
                    ct.ThrowIfCancellationRequested();
                    while (channel.Reader.TryRead(out var message))
                    {
                        await Task.Delay(500);
                        Console.WriteLine(message);
                    }
                }
            }
            catch (OperationCanceledException) { }
            catch
            {
                tokenSource.Cancel();
                throw;
            }
        }))
        .ToArray();
    
        Task waitForConsumers = Task.WhenAll(consumerTasks);
        try { await waitForConsumers; }
        catch
        {
            foreach (var e in waitForConsumers.Exception.Flatten().InnerExceptions)
                Console.WriteLine(e.ToString());
    
            throw waitForConsumers.Exception.Flatten();
        }
    }
    

    正如Theodor Zoulias 指出的那样: 在多个消费者异常情况下,剩余的任务将继续运行并且必须承担被杀死任务的负载。为了避免这种情况,我实现了一个 CancellationToken 来停止所有剩余的任务并处理 waitForConsumers.ExceptionAggregateException 中组合的异常。

    旁注:
    Task Parallel Library (TPL) 可能擅长根据您的本地资源自动限制任务。但是当您通过 RPC 远程处理数据时,需要手动限制您的 RPC 调用以避免填满网络/处理堆栈!

    【讨论】:

    • 这是对ActionBlock&lt;T&gt; 的重新发明尝试。它看起来不错,但它有一个问题。如果消费者失败,其余消费者将继续工作,processMessages 将继续以降低的并行度运行。如果除了一个之外的所有消费者都失败了,最后一个消费者将慢慢地单独处理所有剩余的消息,直到异常最终浮出水面。
    • 感谢您指出这一点!我添加了一个 try catch 块和注释
    • 不,我不认为吞下异常是解决这个问题的正确方法。正确的方法是一旦观察到异常就快速失败。当一个消费者失败时,所有其他消费者应该停止消费通道,并且应该在处理他们当前的消息后立即break。这就是ActionBlock&lt;T&gt;Parallel 类和 PLINQ 库的行为方式。您需要在这方面做更多的工作,以使其成为ActionBlock&lt;T&gt; 的无特色替代品,它至少在几乎无能为力的情况下具有正确的行为。
    • 我想你已经解决了。在这种情况下,恕我直言,CancellationToken 正是需要的。
    • 是的,现在好多了。 ? 此时您可能已经意识到使用Channels 和Tasks 实现这种功能既具有挑战性又费力。实际上这样做并没有多大意义,除了作为一种学习体验,当这个功能已经在 .NET 5 中以ActionBlock&lt;T&gt; 类的形式(更不用说TransformBlock&lt;TInput,TOutput&gt; 和其他TPL Dataflow 库的强大模块)。您可以看到ActionBlock&lt;T&gt; 正在运行here
    【解决方案5】:

    如果您的 Process 方法是异步的,则不能使用 Task.Factory.StartNew,因为它不能很好地与异步委托配合使用。使用它时还有一些其他细微差别(例如,参见this)。

    在这种情况下,正确的做法是使用Task.Run。这是针对异步 Process 方法修改的 @ClearLogic 答案。

    static void Main(string[] args)
    {
        int maxConcurrency = 5;
        List<string> messages =  Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();
    
        using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
        {
            List<Task> tasks = new List<Task>();
            foreach (var msg in messages)
            {
                concurrencySemaphore.Wait();
    
                var t = Task.Run(async () =>
                {
                    try
                    {
                        await Process(msg);
                    }
                    finally
                    {
                        concurrencySemaphore.Release();
                    }
                });
    
                tasks.Add(t);
            }
    
           Task.WaitAll(tasks.ToArray());
        }
        Console.WriteLine("Exited using block");
        Console.ReadKey();
    }
    
    private static async Task Process(string msg)
    {            
        await Task.Delay(2000);
        Console.WriteLine(msg);
    }
    

    【讨论】:

    • 这个解决方案不必要地阻塞了调用线程(.Wait().WaitAll()),所以我认为它不是最理想的。
    • @TheodorZoulias 接受的答案使用相同的方法。当 Process 方法是异步的时,这只是一个轻微的修改。如果您不想阻塞调用线程,那么只需使您的调用线程(在本例中为 Main)异步并将 Task.WaitAll 替换为 await Task.WhenAll。这是调用线程为静态 void main 的简化情况。但是,如果它是一个带有异步处理的网络请求,那么它就可以正常工作而不会阻塞任何东西。
    • 很公平。我也对接受的答案投了反对票。当异步解决方案可用时,我不喜欢阻塞解决方案。 ?
    • 优秀的解决方案。我建议您进行上述更改:将Task.WaitAll() 更改为await Task.WhenAll()。将Main 函数更改为异步函数可能会有所帮助,以表明整个方法可以是它自己的等待函数。
    【解决方案6】:

    您可以创建自己的 TaskScheduler 并在那里覆盖 QueueTask。

    protected virtual void QueueTask(Task task)
    

    然后你可以做任何你喜欢的事情。

    这里有一个例子:

    Limited concurrency level task scheduler (with task priority) handling wrapped tasks

    【讨论】:

      【解决方案7】:

      您可以像这样简单地设置最大并发度:

      int maxConcurrency=10;
      var messages = new List<1000>();
      using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
      {
          foreach(var msg in messages)
          {
              Task.Factory.StartNew(() =>
              {
                  concurrencySemaphore.Wait();
                  try
                  {
                       Process(msg);
                  }
                  finally
                  {
                      concurrencySemaphore.Release();
                  }
              });
          }
      }
      

      【讨论】:

      • 如果线程池的线程数超过您的最大并发数,这会不必要地阻塞线程。
      【解决方案8】:

      如果您需要按顺序排队(处理可能以任何顺序完成),则不需要信号量。老式的 if 语句可以正常工作:

              const int maxConcurrency = 5;
              List<Task> tasks = new List<Task>();
              foreach (var arg in args)
              {
                  var t = Task.Run(() => { Process(arg); } );
      
                  tasks.Add(t);
      
                  if(tasks.Count >= maxConcurrency)
                      Task.WaitAny(tasks.ToArray());
              }
      
              Task.WaitAll(tasks.ToArray());
      

      【讨论】:

      • 它是否应该在循环中等待并且只有在接近结束时才点击最后一个WaitAll?因为根据我的经验,它只是在循环中尖叫并几乎立即击中WaitAll
      • 是的,这是因为完成的任务并没有从任务列表中删除,所以下次WaitAny 被点击时,它会找到第一个完成的任务并继续前进。
      【解决方案9】:
       public static void RunTasks(List<NamedTask> importTaskList)
          {
              List<NamedTask> runningTasks = new List<NamedTask>();
      
              try
              {
                  foreach (NamedTask currentTask in importTaskList)
                  {
                      currentTask.Start();
                      runningTasks.Add(currentTask);
      
                      if (runningTasks.Where(x => x.Status == TaskStatus.Running).Count() >= MaxCountImportThread)
                      {
                          Task.WaitAny(runningTasks.ToArray());
                      }
                  }
      
                  Task.WaitAll(runningTasks.ToArray());
              }
              catch (Exception ex)
              {
                  Log.Fatal("ERROR!", ex);
              }
          }
      

      【讨论】:

        【解决方案10】:

        您可以使用BlockingCollection,如果已达到消费收集限制,则生产将停止生产,直到消费过程完成。我发现这种模式比SemaphoreSlim 更容易理解和实现。

        int TasksLimit = 10;
        BlockingCollection<Task> tasks = new BlockingCollection<Task>(new ConcurrentBag<Task>(), TasksLimit);
        
        void ProduceAndConsume()
        {
            var producer = Task.Factory.StartNew(RunProducer);
            var consumer = Task.Factory.StartNew(RunConsumer);
        
            try
            {
                Task.WaitAll(new[] { producer, consumer });
            }
            catch (AggregateException ae) { }
        }
        
        void RunConsumer()
        {
            foreach (var task in tasks.GetConsumingEnumerable())
            {
                task.Start();
            }
        }
        
        void RunProducer()
        {
            for (int i = 0; i < 1000; i++)
            {
                tasks.Add(new Task(() => Thread.Sleep(1000), TaskCreationOptions.AttachedToParent));
            }
        }
        

        请注意,RunProducerRunConsumer 产生了两个独立的任务。

        【讨论】:

        • 我猜OP也想知道他们的任务什么时候完成。此解决方案缺少此功能。
        • 嘿@TheodorZoulias,感谢您的评论,不确定我是否理解,您将知道Task.WaitAll完成后所有任务何时完成
        • Task.WaitAll 之后,任务producerconsumer 将完成,但在BlockingCollection 中添加的1000 个任务中的一些仍将运行。
        • 我已经更新了我的答案,我相信有时候你不介意在所有任务完成后不被通知。
        • 好的。我只是注意到另一个问题。消耗BlockingCollection 的循环只是启动任务,而不是等待它们完成。启动任务不是 CPU 密集型工作,它几乎是立即发生的。所以我认为所有1000个任务都会立即启动,限制并行度的目标不会实现。
        【解决方案11】:

        我遇到了一个类似的问题,我想在调用 api 等时产生 5000 个结果。所以,我进行了一些速度测试。

        Parallel.ForEach(products.Select(x => x.KeyValue).Distinct().Take(100), id =>
        {
            new ParallelOptions { MaxDegreeOfParallelism = 100 };
            GetProductMetaData(productsMetaData, client, id).GetAwaiter().GetResult();
        });
        

        在 30 秒内产生 100 个结果。

        Parallel.ForEach(products.Select(x => x.KeyValue).Distinct().Take(100), id =>
        {
            new ParallelOptions { MaxDegreeOfParallelism = 100 };
            GetProductMetaData(productsMetaData, client, id);
        });
        

        将 GetAwaiter().GetResult() 移动到 GetProductMetaData 中的各个异步 api 调用需要 14.09 秒才能产生 100 个结果。

        foreach (var id in ids.Take(100))
        {
            GetProductMetaData(productsMetaData, client, id);
        }
        

        在 api 调用中使用 GetAwaiter().GetResult() 完成非异步编程需要 13.417 秒。

        var tasks = new List<Task>();
        while (y < ids.Count())
        {
            foreach (var id in ids.Skip(y).Take(100))
            {
                tasks.Add(GetProductMetaData(productsMetaData, client, id));
            }
        
            y += 100;
            Task.WhenAll(tasks).GetAwaiter().GetResult();
            Console.WriteLine($"Finished {y}, {sw.Elapsed}");
        }
        

        形成一个任务列表,一次完成 100 个,速度为 7.36 秒。

                    using (SemaphoreSlim cons = new SemaphoreSlim(10))
                    {
                        var tasks = new List<Task>();
                        foreach (var id in ids.Take(100))
                        {
                            cons.Wait();
                            var t = Task.Factory.StartNew(() =>
                            {
                                try
                                {
                                    GetProductMetaData(productsMetaData, client, id);
                                }
                                finally
                                {
                                    cons.Release();
                                }
                            });
        
                            tasks.Add(t);
                        }
        
                        Task.WaitAll(tasks.ToArray());
                    }
        

        使用 SemaphoreSlim 需要 13.369 秒,但也需要一段时间才能启动以开始使用它。

        var throttler = new SemaphoreSlim(initialCount: take);
        foreach (var id in ids)
        {
            throttler.WaitAsync().GetAwaiter().GetResult();
            tasks.Add(Task.Run(async () =>
            {
                try
                {
                    skip += 1;
                    await GetProductMetaData(productsMetaData, client, id);
        
                    if (skip % 100 == 0)
                    {
                        Console.WriteLine($"started {skip}/{count}, {sw.Elapsed}");
                    }
                }
                finally
                {
                    throttler.Release();
                }
            }));
        }
        

        将 Semaphore Slim 与节流器一起用于我的异步任务耗时 6.12 秒。

        在这个特定项目中,我的答案是使用带有 Semaphore Slim 的节流器。虽然 while foreach 任务列表有时确实击败了限制器,但限制器赢得了 1000 条记录的 4/6 倍。

        我意识到我没有使用 OPs 代码,但我认为这很重要,并增加了这个讨论,因为有时不是应该问的唯一问题,而答案有时是“这取决于你是什么努力去做。”

        现在回答具体问题:

        1. 如何在 c# 中限制最大并行任务数:我展示了如何限制一次完成的任务数。
        2. 我们能否猜测同时处理的最大消息数(假设是正常的四核处理器),或者我们能否限制当时要处理的最大消息数?除非我设置上限,否则我无法猜测一次将处理多少个,但我可以设置一个上限。显然,由于 CPU、RAM 等以及程序本身可以访问的线程和内核数量以及在同一台计算机上串联运行的其他程序,不同的计算机以不同的速度运行。
        3. 如何确保以与集合相同的顺序/顺序处理此消息?如果要按特定顺序处理所有内容,那就是同步编程。能够异步运行的关键是确保它们可以在没有命令的情况下完成所有事情。正如您从我的代码中看到的那样,除非您使用异步代码,否则 100 条记录中的时间差是最小的。如果您需要对正在做的事情进行排序,请在此之前使用异步编程,然后等待并从那里同步执行操作。例如,task1a.start、task2a.start,然后是 task1a.await、task2a.await……然后是 task1b.start task1b.await 和 task2b.start task 2b.await。

        【讨论】:

        • 嗨帕特里克。 new ParallelOptions { MaxDegreeOfParallelism = 100 };Parallel.ForEach 的内部做什么?您作为示例使用的 GetProductMetaData 方法的签名是什么?如果这个方法是异步的,那么它与并行同步工作相关的问题有什么关系呢?对于限制异步操作​​的并发(不是并行性),还有其他更相关的问题,例如thisthis
        • stackoverflow.com/users/11178549/theodor-zoulias,感谢您回答我被否决的原因,我可能会删除我的答案,即使它非常有用,我不应该试图在这里给人们建议。我一直在寻找限制我一次运行多少异步线程的方法。我用的方法本来是同步的,现在是异步的。但我也测试了它的同步。 MaxDegreeOfParallelism 可以在 ForEach 循环内。我测试了这里推荐的每个方法,任务列表是迄今为止最快的……虽然它不是同步的。
        • stackoverflow.com/users/11178549/theodor-zoulias 显然您对此是正确的。尽管我缺乏专业知识和使用正确术语的能力,但我仍然完成了所有速度测试这些事情的工作,并提出了一个不同的智能答案。正如您所说,同一个线程正在完成所有工作,显然我们可以在异步之上添加线程。多线程和异步有什么意义,但要担心速度/性能?这就是为什么我的回答是中肯的。我的错误不在答案中,而是在我的评论中。
        • 一旦我再得到一个负面消息,我将删除它作为同伴压力徽章。你们都不需要我的研究,对吧?有接受者吗?
        猜你喜欢
        • 2022-10-15
        • 2016-04-22
        • 1970-01-01
        • 1970-01-01
        • 2023-03-29
        • 2021-07-09
        • 1970-01-01
        • 2014-05-31
        • 2021-06-09
        相关资源
        最近更新 更多