【问题标题】:How to do parallel.for async methods如何做 parallel.for 异步方法
【发布时间】:2017-05-09 05:33:44
【问题描述】:

在 c# 中使用一些异步方法进行并行处理的最佳方法是什么。 让我用一些简单的代码解释一下

示例场景:我们有一个人和 1000 个来自他们的文本文件。我们要检查他的文本文件不包含敏感关键字,如果他的文本文件之一包含敏感关键字,我们将他标记为不可信。检查这一点的方法是异步方法,只要我们发现其中一个敏感关键字不需要进一步处理,就必须为那个人打破检查循环。

为了获得最佳性能并使其如此快速,我们必须使用并行处理 简单的伪代码:

boolean sesitivedetected=false;
Parallel.ForEach(textfilecollection,async (textfile,parallelloopstate)=>
{
    if (await hassensitiveasync(textfile))
    {
         sensitivedetected=true;
         parallelloopstate.break()
    }
}
‌if (sensitivedetected)
    markuntrusted(person)

问题是Parallel.ForEach 不等到异步任务完成,所以语句‌if (sensitivedetected) 在创建任务完成后立即运行。 我阅读了其他问题,例如 write parallel.for with asyncasync/await and Parallel.For 以及许多其他页面。 当您需要稍后收集和使用异步方法的结果时,此主题很有用,但在我的场景中,循环的执行应尽快结束。


更新:示例代码:

        Boolean detected=false;
        Parallel.ForEach(UrlList,  async (url, pls) =>
          {
              using (HttpClient hc = new HttpClient())
              {
                  var result = await hc.GetAsync(url);

                  if ((await result.Content.ReadAsStringAsync()).Contains("sensitive"))
                  {
                      detected = true;
                      pls.Break();
                  }
              }
          });
        if (detected)
            Console.WriteLine("WARNING");

【问题讨论】:

  • 简短版:不要将Parallel.For()async 方法混用。如您所述,您的问题确实太宽泛了;你没有包含一个好的minimal reproducible example,并且有很多方法可以根据它的每个部分在真实代码中的作用来解释你的伪代码。但是您可以只启动任务而不是使用Parallel,如果需要中断这些任务,当然可以使用CancellationTokenSource 与这些任务通信,当然也可以与创建它们的循环通信。如果您需要更具体的建议,请改进问题。
  • 如果你真的必须使用Parallel.For(),并且你真的必须使用一个async方法来确定循环是否应该继续,并且该方法是你Parallel.For()委托中唯一的等待表达式,那么你可以同步使用它,即if (hassensitiveasync(textfile).Result)。从实现细节来看,这是一种糟糕的做法,但如果你把自己画到角落里,有时你不得不留下脚印。
  • @XX 你使用了错误的类来做错误的事情。 Parallel.Foreach 用于data 并行,即处理大量数据。它用于异步操作或响应事件。它也不用于等待来自已经异步事件的响应,这就是Task.WhenAll 的用途。您可以启动 100 个 GetAsync 调用,将它们放入一个数组中并等待所有这些调用。
  • @XX 这就是您要使用流处理的原因。流处理有两个库,Reactive Extensions 和 TPL Dataflow。我在下面有一个示例,说明如何使用 Reactive Extensions 实现这一目标。

标签: c# multithreading asynchronous parallel-processing parallel.for


【解决方案1】:

实现你需要的最简单的方法(而不是你想要的,因为线程是邪恶的)。是使用 ReactiveExtensions。

var firstSensitive = await UrlList
                     .Select(async url => {
                         using(var http = new HttpClient()
                         {
                             var result = await hc.GetAsync(url);
                             return await result.Content.ReadAsStringAsync();
                         }
                     })
                     .SelectMany(downloadTask => downloadTask.ToObservable())
                     .Where(result => result.Contains("sensitive"))
                     .FirstOrDefaultAsync();

if(firstSensitive != null)
    Console.WriteLine("WARNING");

限制并发 HTTP 查询的数量:

int const concurrentRequestLimit = 4;
var semaphore = new SemaphoreSlim(concurrentRequestLimit);
var firstSensitive = await UrlList
                     .Select(async url => {
                         await semaphore.WaitAsync()
                         try
                         using(var http = new HttpClient()
                         {
                             var result = await hc.GetAsync(url);
                             return await result.Content.ReadAsStringAsync();
                         }
                         finally
                             semaphore.Release();
                     })
                     .SelectMany(downloadTask => downloadTask.ToObservable())
                     .Where(result => result.Contains("sensitive"))
                     .FirstOrDefaultAsync();

if(firstSensitive != null)
    Console.WriteLine("WARNING");

【讨论】:

    猜你喜欢
    • 2013-10-17
    • 1970-01-01
    • 1970-01-01
    • 2017-03-06
    • 2018-01-13
    • 1970-01-01
    • 2013-05-07
    • 2014-04-19
    • 1970-01-01
    相关资源
    最近更新 更多