【问题标题】:Multi-threading in a foreach loopforeach 循环中的多线程
【发布时间】:2020-04-30 10:08:44
【问题描述】:

我已经阅读了一些关于 foreach 循环中的多线程的 stackoverflow 线程,但我不确定我是否理解并正确使用它。
我已经尝试了多种方案,但我没有看到性能有太大的提高。

这是我认为运行异步任务,但使用单线程在循环中同步运行:

Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();

foreach (IExchangeAPI selectedApi in selectedApis)
{
    if (exchangeSymbols.TryGetValue(selectedApi.Name, out symbol))
    {
        ticker = await selectedApi.GetTickerAsync(symbol);
    }               
}    
stopWatch.Stop();

这是我希望异步运行(仍然使用单线程) - 我已经预料到速度会有所提高:

List<Task<ExchangeTicker>> exchTkrs = new List<Task<ExchangeTicker>>();
stopWatch.Start();

foreach (IExchangeAPI selectedApi in selectedApis)
{
    if (exchangeSymbols.TryGetValue(selectedApi.Name, out symbol))
    {
        exchTkrs.Add(selectedApi.GetTickerAsync(symbol));
    }
}

ExchangeTicker[] retTickers = await Task.WhenAll(exchTkrs);
stopWatch.Stop();

这是我希望在多线程中异步运行

stopWatch.Start();

Parallel.ForEach(selectedApis, async (IExchangeAPI selectedApi) =>
{
    if (exchangeSymbols.TryGetValue(selectedApi.Name, out symbol))
    {
        ticker = await selectedApi.GetTickerAsync(symbol);
    }
});
stopWatch.Stop();

秒表结果解释如下:

Console.WriteLine("Time elapsed (ns): {0}", stopWatch.Elapsed.TotalMilliseconds * 1000000);

控制台输出:

Time elapsed (ns): 4183308100
Time elapsed (ns): 4183946299.9999995
Time elapsed (ns): 4188032599.9999995

现在,速度提升看起来微不足道。我做错了什么还是或多或少是我应该期待的?我想写入文件会更好地检查。
您是否介意确认我正确解释了不同的用例?

最后,使用 foreach 循环从多个平台并行获取代码可能不是最好的方法。欢迎就如何改进这一点提出建议。

编辑

请注意,我使用的是 ExchangeSharp 代码库,您可以找到 here

GerTickerAsync() 方法如下所示:

public virtual async Task<ExchangeTicker> GetTickerAsync(string marketSymbol)
{
    marketSymbol = NormalizeMarketSymbol(marketSymbol);
    return await Cache.CacheMethod(MethodCachePolicy, async () => await OnGetTickerAsync(marketSymbol), nameof(GetTickerAsync), nameof(marketSymbol), marketSymbol);
}

对于 Kraken API,您有:

protected override async Task<ExchangeTicker> OnGetTickerAsync(string marketSymbol)
{
    JToken apiTickers = await MakeJsonRequestAsync<JToken>("/0/public/Ticker", null, new Dictionary<string, object> { { "pair", NormalizeMarketSymbol(marketSymbol) } });
    JToken ticker = apiTickers[marketSymbol];
    return await ConvertToExchangeTickerAsync(marketSymbol, ticker);
}

以及缓存方法:

public static async Task<T> CacheMethod<T>(this ICache cache, Dictionary<string, TimeSpan> methodCachePolicy, Func<Task<T>> method, params object?[] arguments) where T : class
{
    await new SynchronizationContextRemover();
    methodCachePolicy.ThrowIfNull(nameof(methodCachePolicy));
    if (arguments.Length % 2 == 0)
    {
        throw new ArgumentException("Must pass function name and then name and value of each argument");
    }
    string methodName = (arguments[0] ?? string.Empty).ToStringInvariant();
    string cacheKey = methodName;
    for (int i = 1; i < arguments.Length;)
    {
        cacheKey += "|" + (arguments[i++] ?? string.Empty).ToStringInvariant() + "=" + (arguments[i++] ?? string.Empty).ToStringInvariant("(null)");
    }
    if (methodCachePolicy.TryGetValue(methodName, out TimeSpan cacheTime))
    {
        return (await cache.Get<T>(cacheKey, async () =>
        {
            T innerResult = await method();
            return new CachedItem<T>(innerResult, CryptoUtility.UtcNow.Add(cacheTime));
        })).Value;
    }
    else
    {
        return await method();
    }
}

【问题讨论】:

  • 这是一个很常见的误解,但 C# 中的异步方法调用并不是并行处理的神奇门票。在上面的所有示例中,one and only one thread is ever being used:您从中调用方法的主线程。
  • @PatrickTucci,你是说“Parallel.ForEach”不会并行运行我的任务吗?
  • 对不起,我没有看到那个例子。那是我的错误。 Parallel.ForEach 和其他 PLINQ 调用可以自动在多个线程池线程上运行查询。传统的异步方法调用不会自动创建并行性,大多数新接触 TPL 和 async/await 关键字的程序员都相信他们会这样做。这就是我最初发表评论的原因。但你是对的,Parallel.ForEach 确实使用了多个线程。
  • @PatrickTucci,很好,所以我的 3 种方法和对它们的解释没有错吗?
  • 您的解释似乎对所有三种方法都是正确的。我假设,就像 TomTom 提到的那样,您没有看到执行时间减少,因为 API 限制了您的调用。

标签: c# multithreading async-await task


【解决方案1】:

首先应该指出,您要实现的是性能,而不是异步。您正试图通过运行多个操作concurrently, not in parallel 来实现它。为了使解释简单,我将使用您的代码的简化版本,并假设每个操作都是直接的 Web 请求,没有中间缓存层,并且不依赖于字典中存在的值。

foreach (var symbol in selectedSymbols)
{
    var ticker = await selectedApi.GetTickerAsync(symbol);
}

以上代码按顺序运行操作。每个操作在前一个操作完成后开始。

var tasks = new List<Task<ExchangeTicker>>();
foreach (var symbol in selectedSymbols)
{
    tasks.Add(selectedApi.GetTickerAsync(symbol));
}
var tickers = await Task.WhenAll(tasks);

以上代码同时运行这些操作。所有操作立即开始。总持续时间预计是最长运行操作的持续时间。

Parallel.ForEach(selectedSymbols, async symbol =>
{
    var ticker = await selectedApi.GetTickerAsync(symbol);
});

上面的代码同时运行这些操作,就像之前的版本Task.WhenAll一样。它没有提供任何优势,同时有一个巨大的劣势,即您无法再通过await 完成操作。 Parallel.ForEach 方法将在启动操作后立即返回,因为 Paralleldoesn't understand async delegates(它不接受 Func&lt;Task&gt; lambdas)。本质上,其中有一堆 async void lambda,它们正在失控,如果出现异常,它们会导致进程停止。

所以并发运行操作的正确方法是第二种方法,使用任务列表和Task.WhenAll。由于您已经测量了这种方法并且没有观察到任何性能改进,我假设还有其他东西可以序列化并发操作。它可能类似于隐藏在代码中某处的SemaphoreSlim,或者是服务器端限制您的请求的某种机制。您必须进一步调查以找出限制发生的位置和原因。

【讨论】:

  • 哇哈,感谢您的精彩解释。我确实看到那里确实使用了 SemaphoreSlim。我对此一无所知,但会检查一下。
  • @stackMeUp 是的,Parallel.ForEach 没有 await async void 代表完成。它只是启动它们然后返回。
  • @stackMeUp 是的。您可以将秒表放在 lambda 中。但这是浪费时间。 Parallel 类不是异步友好的。
  • 好的,我会听从你的建议,不会理会那个!再次感谢您的宝贵帮助:-)
  • @stackMeUp 不,任务几乎总是被创建hot,这意味着它们已经开始了。 Task.WhenAll 只是用来等待它们全部完成。 await 对任务本身没有影响。它影响调用者,而不是被调用者。
【解决方案2】:

一般来说,当您没有看到多线程增加时,这是因为您的任务不受 CPU 限制或大到足以抵消开销。

在您的示例中,即:

selectedApi.GetTickerAsync(symbol);

这可能有两个原因:

1:查询代码非常快,一开始不应该是异步的。 IE。当您在字典中查找时。

2:这是通过 http 连接运行的,其中运行时限制并发调用的数量。无论你打开多少个任务,它同时使用的任务不会超过 4 个。

哦,还有 3:您认为 async 正在使用线程。它不是。在这样的代码中尤其如此:

等待 selectedApi.GetTickerAsync(symbol);

您基本上立即等待结果。这里根本不涉及多线程。

foreach (IExchangeAPI selectedApi in selectedApis) { if (exchangeSymbols.TryGetValue(selectedApi.Name, out symbol)) { 股票代码 = 等待 selectedApi.GetTickerAsync(symbol); } }

这是线性非线程代码,使用异步接口在(可能很昂贵的 IO)操作到位时不阻塞当前线程。它开始一个,然后等待结果。不会有 2 个查询同时开始。

如果您想要一种可能(仅作为示例)更具可扩展性的方式:

  • 在 foreach 中,不要等待,而是将任务添加到任务列表中。
  • 然后在所有任务开始后开始等待。类似于第二个循环。

方式并不完美,但至少运行时有机会同时进行多个查找。您的 await 确保您基本上运行单线程代码,除了异步,因此您的线程返回到池中(并且不等待结果),增加您的可伸缩性 - 在这种情况下可能不相关的项目,并且绝对不会在您的测试。

【讨论】:

  • 谢谢。您似乎证实了我在问题中所说的话。不过,您没有评论 Parallel.ForEach,这是关键点,我相信正在执行并行任务:-)
  • 其实我做到了。 Parallel.Foreach 会以极快的速度运行 #2 的问题 - 您启动了很多并行操作,但在许多情况下(http)处理的并行请求数量是有限的。所以,他们基本上都在排队。
猜你喜欢
  • 2021-06-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-05-28
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多