【问题标题】:.net Core Parallel.ForEach issues.net Core Parallel.ForEach 问题
【发布时间】:2016-09-30 14:59:14
【问题描述】:

我已经为一些项目切换到 .net Core,但现在遇到了 Parallel.ForEach 的问题。过去,我经常有一个 id 值列表,然后我会使用它来发出 Web 请求以获取完整数据。它看起来像这样:

Parallel.ForEach(myList, l =>
{
    // make web request using l.id 
    // process the data somehow
});

嗯,在 .net Core 中,Web 请求必须全部标记为 await,这意味着 Parallel.ForEach 操作必须标记为 async。但是,将 Parallel.ForEach 操作标记为 async 意味着我们有一个导致问题的 void async 方法。在我的特殊情况下,这意味着响应返回到应用程序并行循环中的所有 Web 请求都完成之前,这既尴尬又会导致错误。

问题:在此处使用 Parallel.ForEach 的替代方法是什么?

我发现的一个可能的解决方案是将并行循环包装在任务中并等待任务:

await Task.Run(() => Parallel.ForEach(myList, l =>
{
    // stuff here
}));

(在这里找到:Parallel.ForEach vs Task.Run and Task.WhenAll

但是,这对我不起作用。当我使用它时,我仍然会在循环完成之前返回应用程序。

另一种选择:

var tasks = new List<Task>();
foreach (var l in myList)
{
    tasks.Add(Task.Run(async () =>
    {
         // stuff here
    }));
}
await Task.WhenAll(tasks);

这似乎可行,但这是唯一的选择吗?似乎新的 .net Core 使 Parallel.ForEach 几乎无用(至少在嵌套 Web 调用方面)。

感谢任何帮助/建议。

【问题讨论】:

  • async/await 设计用于长时间阻塞I/O 操作,而Parallel 设计用于长时间阻塞CPU 操作。如果您发现自己试图在 Parallel 函数体中编写异步代码,那么您做错了什么。考虑改用Task.WhenAll
  • 除了上面的评论,当你做Task.Run(async() => ...),你也几乎总是做错事。
  • 您应该查看TPL Dataflow。让你的生活轻松很多。它不是 .NET Framework 的一部分,但您可以使用 nuget 来获取它,
  • @MatiasCicero 感谢您解释预期用途之间的差异,这非常有启发性。
  • @Evk 如果那是错误的方式,你能指出正确的方向吗?

标签: c# parallel-processing


【解决方案1】:

为什么Parallel.ForEach 不适合此任务在 cmets 中进行了解释:它专为 CPU 密集型(CPU 密集型)任务而设计。如果您将其用于 IO 绑定操作(例如发出 Web 请求) - 您将浪费线程池线程在等待响应时被阻塞,这没有任何好处。仍然可以使用它,但不适合这种情况。

您需要的是使用异步 Web 请求方法(如 HttpWebRequest.GetResponseAsync),但还有另一个问题 - 您不想一次执行所有 Web 请求(如另一个答案所示)。您的列表中可能有数千个 url (id)。所以你可以使用为此设计的线程同步结构,例如SemaphoreSemaphore 就像队列 - 它允许 X 线程通过,其余的应该等到一个繁忙的线程完成它的工作(有点简化的描述)。这是一个例子:

static async Task ProcessUrls(string[] urls) {
    var tasks = new List<Task>();
    // semaphore, allow to run 10 tasks in parallel
    using (var semaphore = new SemaphoreSlim(10)) {
        foreach (var url in urls) {
            // await here until there is a room for this task
            await semaphore.WaitAsync();
            tasks.Add(MakeRequest(semaphore, url));
        }
        // await for the rest of tasks to complete
        await Task.WhenAll(tasks);
    }
}

private static async Task MakeRequest(SemaphoreSlim semaphore, string url) {
    try {
        var request = (HttpWebRequest) WebRequest.Create(url);

        using (var response = await request.GetResponseAsync().ConfigureAwait(false)) {
            // do something with response    
        }
    }
    catch (Exception ex) {
        // do something
    }
    finally {
        // don't forget to release
        semaphore.Release();
    }
}

【讨论】:

  • 谢谢你。我不认为这是我们现在要走的路,但这是未来要记住的事情。
【解决方案2】:

这三个方法都不好。

在这种情况下,您不应使用 Parallel 类或 Task.Run

改为使用async 处理程序方法:

private async Task HandleResponse(Task<HttpResponseMessage> gettingResponse)
{
     HttpResponseMessage response = await gettingResponse;
     // Process the data
}

然后使用Task.WhenAll:

Task[] requests = myList.Select(l => SendWebRequest(l.Id))
                        .Select(r => HandleResponse(r))
                        .ToArray();

await Task.WhenAll(requests);

【讨论】:

  • 谢谢你。我能够在我的项目中成功实现它。
  • 您能解释一下为什么这比 OP 发布的示例更好吗?
【解决方案3】:

您应该使用 ref 关键字调用方法来完成工作,并且应该以最小的努力做到这一点。在类似的情况下,这种方法对我来说效果很好。

Parallel.ForEach(myList, l =>
{

    // make web request using ref l.id 
    string id=l.id;
    WebRequest webRequest= MakeRequest(ref id);
    // process the data somehow
});

private WebRequest MakeRequest(ref string id)
{
  //make and return web request
}

【讨论】:

  • 但是如果你有数千个 id,你最终会收到数千个请求,所以你可以通过指定 MaxDegreeOfParallelism 来限制
  • 注意Parallel.ForEach 不会尝试对列表中的所有项目执行操作,它会为您管理批处理大小,MaxDegreeOfParallelism 允许您指定最大并发线程数采用。在大多数情况下,您不需要指定最大值,但是在运行时观察您的代码后,您可能会发现您希望限制并发性以允许其他(可能是并行或异步)进程以更高的优先级执行。
【解决方案4】:

我认为这段代码可以工作:

for (int i = 0; i < myList.Length; i++)
{
    var item = myList[i];

    var msg = await SendAsync(item.Id);
    //Post Process
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-03-11
    • 1970-01-01
    • 2020-07-30
    • 2021-01-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多