【问题标题】:Limit number of Async methods executing in parallel [duplicate]限制并行执行的异步方法的数量[重复]
【发布时间】:2018-07-26 06:34:41
【问题描述】:

我对异步方法的并行执行有疑问。

我想限制将同时执行的异步方法的数量(以限制对另一个系统的 Web 请求的数量,这些请求实际上是在异步方法中发送的)。

那么最好的方法是什么? 我通过使用Parallel找到了一个解决方案,并设置了DegreeOfParallelism,但我不太喜欢这个解决方案,因为它会阻塞等于DOP(并行度)的线程数。

所以,这里是 Parallel 的代码:

var results = dataProvider.GetResults(someIdParam);
            var average = results.AsParallel().WithDegreeOfParallelism(5).Average(x =>
            {
                var t = GetSSIM(x);
                t.Wait();
                return t.Result;
            });

因此,这将起作用并限制同时请求的数量,但会阻塞 5 个线程。

我最终编写了自己的方法:

    public static async Task<IEnumerable<T2>> ProcessEachAsync<T1, T2>(IEnumerable<T1> src, Func<T1, Task<T2>> func, int dop)
    {
        var workers = new Task<T2>[Math.Min(dop, src.Count())]; //to make sure that we will not have nulls in workers collection
        var result = new List<T2>();

        int counter = 0;
        int index = 0;
        foreach(var element in src)
        {
            if (counter < dop)
                index = counter++;
            else
            {
                var ended = await Task.WhenAny(workers);
                index = Array.FindIndex(workers, x => x == ended);
                result.Add(ended.Result);
            }

            var t = func(element);
            t.Start();
            workers[index] = t;
        }

        Task.WaitAll(workers);
        result.AddRange(workers.Select(x => x.Result));
        return result;
    }

注意!!!!!!此代码尚未经过测试并且有错误!!!! 但它解释了主要思想

因此,此解决方案将仅阻塞 1 个线程。也许有更简单的方法来实现我想要的?

【问题讨论】:

标签: c# multithreading asynchronous async-await task-parallel-library


【解决方案1】:

感谢@evk 和帮助我解决这个问题的同事。 所以我用 SemaphoreSlim 实现了一个解决方案。缺点是把所有的数据都转换成Tasks,但是代码太美我就不说了:)

    public static async Task<IEnumerable<T2>> ProcessEachAsync<T1, T2>(IEnumerable<T1> src, Func<T1, Task<T2>> func, int dop)
    {
        using (var semSlim = new SemaphoreSlim(dop))
        {
            var result = new ConcurrentBag<T2>();
            Func<T1, Task> getTask = async (x) =>
            {
                try
                {
                    await semSlim.WaitAsync();
                    var res = await func(x);
                    result.Add(res);
                }
                finally
                {
                    semSlim.Release();
                }
            };

            await Task.WhenAll(src.Select(x => getTask(x)));
            return result;
        }
    }

注意!!!未测试!!!!

谢谢大家!

【讨论】:

  • 你忘记处理信号量了。也不需要“新任务” - lambdas 也可以是异步的。最后,也不需要并发包 - 您可以从 lambda ('return await func(x)') 返回结果
  • @Evk 我根据您的评论进行了更改。现在这个答案代表了我最终得到的解决方案。谢谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-04-02
  • 2021-09-25
  • 2020-10-25
  • 2018-10-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多