我遇到了一个类似的问题,我想在调用 api 等时产生 5000 个结果。所以,我进行了一些速度测试。
Parallel.ForEach(products.Select(x => x.KeyValue).Distinct().Take(100), id =>
{
new ParallelOptions { MaxDegreeOfParallelism = 100 };
GetProductMetaData(productsMetaData, client, id).GetAwaiter().GetResult();
});
在 30 秒内产生 100 个结果。
Parallel.ForEach(products.Select(x => x.KeyValue).Distinct().Take(100), id =>
{
new ParallelOptions { MaxDegreeOfParallelism = 100 };
GetProductMetaData(productsMetaData, client, id);
});
将 GetAwaiter().GetResult() 移动到 GetProductMetaData 中的各个异步 api 调用需要 14.09 秒才能产生 100 个结果。
foreach (var id in ids.Take(100))
{
GetProductMetaData(productsMetaData, client, id);
}
在 api 调用中使用 GetAwaiter().GetResult() 完成非异步编程需要 13.417 秒。
var tasks = new List<Task>();
while (y < ids.Count())
{
foreach (var id in ids.Skip(y).Take(100))
{
tasks.Add(GetProductMetaData(productsMetaData, client, id));
}
y += 100;
Task.WhenAll(tasks).GetAwaiter().GetResult();
Console.WriteLine($"Finished {y}, {sw.Elapsed}");
}
形成一个任务列表,一次完成 100 个,速度为 7.36 秒。
using (SemaphoreSlim cons = new SemaphoreSlim(10))
{
var tasks = new List<Task>();
foreach (var id in ids.Take(100))
{
cons.Wait();
var t = Task.Factory.StartNew(() =>
{
try
{
GetProductMetaData(productsMetaData, client, id);
}
finally
{
cons.Release();
}
});
tasks.Add(t);
}
Task.WaitAll(tasks.ToArray());
}
使用 SemaphoreSlim 需要 13.369 秒,但也需要一段时间才能启动以开始使用它。
var throttler = new SemaphoreSlim(initialCount: take);
foreach (var id in ids)
{
throttler.WaitAsync().GetAwaiter().GetResult();
tasks.Add(Task.Run(async () =>
{
try
{
skip += 1;
await GetProductMetaData(productsMetaData, client, id);
if (skip % 100 == 0)
{
Console.WriteLine($"started {skip}/{count}, {sw.Elapsed}");
}
}
finally
{
throttler.Release();
}
}));
}
将 Semaphore Slim 与节流器一起用于我的异步任务耗时 6.12 秒。
在这个特定项目中,我的答案是使用带有 Semaphore Slim 的节流器。虽然 while foreach 任务列表有时确实击败了限制器,但限制器赢得了 1000 条记录的 4/6 倍。
我意识到我没有使用 OPs 代码,但我认为这很重要,并增加了这个讨论,因为有时不是应该问的唯一问题,而答案有时是“这取决于你是什么努力去做。”
现在回答具体问题:
- 如何在 c# 中限制最大并行任务数:我展示了如何限制一次完成的任务数。
- 我们能否猜测同时处理的最大消息数(假设是正常的四核处理器),或者我们能否限制当时要处理的最大消息数?除非我设置上限,否则我无法猜测一次将处理多少个,但我可以设置一个上限。显然,由于 CPU、RAM 等以及程序本身可以访问的线程和内核数量以及在同一台计算机上串联运行的其他程序,不同的计算机以不同的速度运行。
- 如何确保以与集合相同的顺序/顺序处理此消息?如果要按特定顺序处理所有内容,那就是同步编程。能够异步运行的关键是确保它们可以在没有命令的情况下完成所有事情。正如您从我的代码中看到的那样,除非您使用异步代码,否则 100 条记录中的时间差是最小的。如果您需要对正在做的事情进行排序,请在此之前使用异步编程,然后等待并从那里同步执行操作。例如,task1a.start、task2a.start,然后是 task1a.await、task2a.await……然后是 task1b.start task1b.await 和 task2b.start task 2b.await。