【问题标题】:c# .net 4.5 async / multithread?c#.net 4.5 异步/多线程?
【发布时间】:2014-07-23 08:54:12
【问题描述】:

我正在编写一个从网页抓取数据的 C# 控制台应用程序。

此应用程序将访问大约 8000 个网页并抓取数据(每个页面上的数据格式相同)。

我现在可以在没有异步方法和多线程的情况下使用它。

但是,我需要它更快。它只使用大约 3%-6% 的 CPU,我认为是因为它花费了等待下载 html 的时间。(WebClient.DownloadString(url))

这是我程序的基本流程

DataSet alldata;

foreach(var url in the8000urls)
{
    // ScrapeData downloads the html from the url with WebClient.DownloadString
    // and scrapes the data into several datatables which it returns as a dataset.
    DataSet dataForOnePage = ScrapeData(url);

    //merge each table in dataForOnePage into allData
}

// PushAllDataToSql(alldata);

我一直在尝试多线程,但不知道如何正确开始。我正在使用 .net 4.5,我的理解是异步的,并且在 4.5 中等待是为了让这更容易编程,但我还是有点迷茫。

我的想法是继续为这条线创建异步的新线程

DataSet dataForOnePage = ScrapeData(url);

然后在每个完成后,运行

//merge each table in dataForOnePage into allData

谁能指出我正确的方向,如何在 .net 4.5 c# 中使该行异步,然后让我的合并方法运行完成?

谢谢。

编辑:这是我的 ScrapeData 方法:

public static DataSet GetProperyData(CookieAwareWebClient webClient, string pageid)
{
    var dsPageData = new DataSet();

    // DOWNLOAD HTML FOR THE REO PAGE AND LOAD IT INTO AN HTMLDOCUMENT
    string url = @"https://domain.com?&id=" + pageid + @"restofurl";
    string html = webClient.DownloadString(url);
    var doc = new HtmlDocument();
    doc.LoadHtml(html );

    // A BUNCH OF PARSING WITH HTMLAGILITY AND STORING IN dsPageData 
    return dsPageData ;
}

【问题讨论】:

标签: c# multithreading .net-4.5


【解决方案1】:

如果您想使用 asyncawait 关键字(虽然您不必这样做,但在 .NET 4.5 中它们确实使事情变得更容易),您首先需要将您的 ScrapeData 方法更改为使用 async 关键字返回 Task<T> instance,如下所示:

async Task<DataSet> ScrapeDataAsync(Uri url)
{
    // Create the HttpClientHandler which will handle cookies.
    var handler = new HttpClientHandler();

    // Set cookies on handler.

    // Await on an async call to fetch here, convert to a data
    // set and return.
    var client = new HttpClient(handler);

    // Wait for the HttpResponseMessage.
    HttpResponseMessage response = await client.GetAsync(url);

    // Get the content, await on the string content.
    string content = await response.Content.ReadAsStringAsync();

    // Process content variable here into a data set and return.
    DataSet ds = ...;

    // Return the DataSet, it will return Task<DataSet>.
    return ds;
}

请注意,您可能希望远离 WebClient 类,因为它在其异步操作中固有地不支持 Task&lt;T&gt;。 .NET 4.5 中更好的选择是HttpClient class。我选择使用上面的HttpClient。另外,请查看HttpClientHandler class,特别是CookieContainer property,您将使用它为每个请求发送cookie。

但是,这意味着您很可能必须使用 await 关键字来等待 另一个 异步操作,在这种情况下,很可能是页面的下载.您必须定制下载数据的调用以使用异步版本和await

一旦完成,您通常会为此调用await,但在这种情况下您不能这样做,因为您会在变量上调用await。在这种情况下,您正在运行一个循环,因此每次迭代都会重置该变量。在这种情况下,最好将Task&lt;T&gt; 存储在一个数组中,如下所示:

DataSet alldata = ...;

var tasks = new List<Task<DataSet>>();

foreach(var url in the8000urls)
{
    // ScrapeData downloads the html from the url with 
    // WebClient.DownloadString
    // and scrapes the data into several datatables which 
    // it returns as a dataset.
    tasks.Add(ScrapeDataAsync(url));
}

有将数据合并到allData 的问题。为此,您希望在返回的Task&lt;T&gt; 实例上调用ContinueWith method 并执行将数据添加到allData 的任务:

DataSet alldata = ...;

var tasks = new List<Task<DataSet>>();

foreach(var url in the8000urls)
{
    // ScrapeData downloads the html from the url with 
    // WebClient.DownloadString
    // and scrapes the data into several datatables which 
    // it returns as a dataset.
    tasks.Add(ScrapeDataAsync(url).ContinueWith(t => {
        // Lock access to the data set, since this is
        // async now.
        lock (allData)
        {
             // Add the data.
        }
    });
}

然后,您可以使用Task class 上的WhenAll methodawait 来等待所有任务:

// After your loop.
await Task.WhenAll(tasks);

// Process allData

但是,请注意,您有一个foreach,而WhenAll 采用了一个IEnumerable&lt;T&gt; 实现。这是一个很好的指标,表明它适合使用 LINQ,它是:

DataSet alldata;

var tasks = 
    from url in the8000Urls
    select ScrapeDataAsync(url).ContinueWith(t => {
        // Lock access to the data set, since this is
        // async now.
        lock (allData)
        {
             // Add the data.
        }
    });

await Task.WhenAll(tasks);

// Process allData

如果你愿意,你也可以选择不使用查询语法,在这种情况下没关系。

请注意,如果包含方法未标记为async(因为您在控制台应用程序中并且必须在应用程序终止之前等待结果),那么您可以简单地在Task 上调用Wait method调用WhenAll时返回:

// This will block, waiting for all tasks to complete, all
// tasks will run asynchronously and when all are done, then the
// code will continue to execute.
Task.WhenAll(tasks).Wait();

// Process allData.

也就是说,重点是,您要将Task 实例收集到一个序列中,然后在处理allData 之前等待整个序列。

但是,如果可以的话,我建议在将数据合并到 allData 之前尝试对其进行处理;除非数据处理需要整个 DataSet,否则通过处理返回的尽可能多的数据,您将获得更多的性能提升返回时,而不是等待它全部回来。

【讨论】:

  • 正在输入一个不错的长答案,然后你去发布了这个:) 好帖子,点赞。
  • 感谢您的帮助。这对我的一半问题有很大帮助(等待它们全部完成然后合并),但我仍然对如何更改我的 ScrapeData 方法感到困惑,因为我不确定在哪里或如何使用 await。我正在使用返回字符串的 webclient.DownloadString 下载 html。有一个名为 webclient.DownloadStringAsync 的异步方法返回一个 void,编译器告诉我不能在 void 上使用 await。
  • @casperOne 谢谢你的例子。我刚刚发布了我之前使用的内容。我会研究 HttpClient 而不是 WebClient,也许这就是解决这个问题的方法..
  • 我尝试使用 HttpWebRequest 而不是 HttpClient 编写它,因为我找不到将 cookie 与 HttpClient 一起使用的方法,我必须登录。我尝试运行该程序,我可以得到它会在await Task.WhenAll(tasks); 上中断,但之后它会退出程序而不是处理之后的行。
  • 那是因为it's a console program。尝试使用AsyncContext.RunTask
【解决方案2】:

您也可以使用TPL Dataflow,它非常适合此类问题。

在这种情况下,您构建一个“数据流网格”,然后您的数据流经它。

这个实际上更像是一个管道而不是一个“网格”。我分三个步骤: 从 URL 下载(字符串)数据;将(字符串)数据解析为 HTML,然后解析为DataSet;并将DataSet 合并到主DataSet

首先,我们创建将进入网格的块:

DataSet allData;
var downloadData = new TransformBlock<string, string>(
  async pageid =>
  {
    System.Net.WebClient webClient = null;
    var url = "https://domain.com?&id=" + pageid + "restofurl";
    return await webClient.DownloadStringTaskAsync(url);
  },
  new ExecutionDataflowBlockOptions
  {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
  });
var parseHtml = new TransformBlock<string, DataSet>(
  html =>
  {
    var dsPageData = new DataSet();
    var doc = new HtmlDocument();
    doc.LoadHtml(html);

    // HTML Agility parsing

    return dsPageData;
  },
  new ExecutionDataflowBlockOptions
  {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
  });
var merge = new ActionBlock<DataSet>(
  dataForOnePage =>
  {
    // merge dataForOnePage into allData
  });

然后我们将三个块链接在一起创建网格:

downloadData.LinkTo(parseHtml);
parseHtml.LinkTo(merge);

接下来,我们开始将数据注入网格:

foreach (var pageid in the8000urls)
  downloadData.Post(pageid);

最后,我们等待网格中的每个步骤完成(这也将干净地传播任何错误):

downloadData.Complete();
await downloadData.Completion;
parseHtml.Complete();
await parseHtml.Completion;
merge.Complete();
await merge.Completion;

TPL Dataflow 的好处是您可以轻松控制并行度每个部分。目前,我将下载和解析块都设置为Unbounded,但您可能希望限制它们。合并块使用默认的最大并行度 1,因此合并时不需要锁。

【讨论】:

  • 如果今天被问到这个问题,我会用基于 TPL 的解决方案而不是 the one I gave 来回答;将所有东西连接起来肯定更容易,而且更清洁。
  • TPL 在这里会不会有点矫枉过正? TPL 不是主要为 CPU 密集型并行程序开发的吗?
  • TPL 数据流是一个基于Task 的异步网格。它实际上并不是 .NET 中存在的 TPL 的一部分,而是由同一团队开发的附加库(该团队还开发了 async 支持类型)。
  • @iNfinity 那是不正确的。它实际上非常接近它的名字。它不必受 CPU 限制,您可以轻松地将 I/O 限制操作作为数据流的一部分。这是关于将操作分解为块,然后将所有块链接在一起,并能够控制所有块如何处理并行性、缓冲等问题。IMO,一旦你得到它,块就不会过大非常容易组合在一起,您可以在这些逻辑单元中看到非常适合 TPL 的东西。
  • 太棒了。现在我必须弄清楚 TPL 数据流到底是什么——在这里我想我终于赶上了所有最新的东西! XD
【解决方案3】:

我建议阅读我的reasonably-complete introduction to async/await

首先,让一切异步,从较低级别的东西开始:

public static async Task<DataSet> ScrapeDataAsync(string pageid)
{
  CookieAwareWebClient webClient = ...;
  var dsPageData = new DataSet();

  // DOWNLOAD HTML FOR THE REO PAGE AND LOAD IT INTO AN HTMLDOCUMENT
  string url = @"https://domain.com?&id=" + pageid + @"restofurl";
  string html = await webClient.DownloadStringTaskAsync(url).ConfigureAwait(false);
  var doc = new HtmlDocument();
  doc.LoadHtml(html);

  // A BUNCH OF PARSING WITH HTMLAGILITY AND STORING IN dsPageData 
  return dsPageData;
}

然后您可以按如下方式使用它(使用async 和 LINQ):

DataSet alldata;
var tasks = the8000urls.Select(async url =>
{
  var dataForOnePage = await ScrapeDataAsync(url);

  //merge each table in dataForOnePage into allData

});
await Task.WhenAll(tasks);
PushAllDataToSql(alldata);

并使用我的 AsyncEx 库中的 AsyncContext,因为这是 a console app

class Program
{
  static int Main(string[] args)
  {
    try
    {
      return AsyncContext.Run(() => MainAsync(args));
    }
    catch (Exception ex)
    {
      Console.Error.WriteLine(ex);
      return -1;
    }
  }

  static async Task<int> MainAsync(string[] args)
  {
    ...
  }
}

就是这样。不需要锁定或延续或任何其他。

【讨论】:

    【解决方案4】:

    我相信你在这里不需要asyncawait 的东西。它们可以在需要将工作转移到非 GUI 线程的桌面应用程序中提供帮助。在我看来,在你的情况下使用Parallel.ForEach 方法会更好。像这样的:

        DataSet alldata;
        var bag = new ConcurrentBag<DataSet>();
    
        Parallel.ForEach(the8000urls, url =>
        {
            // ScrapeData downloads the html from the url with WebClient.DownloadString 
            // and scrapes the data into several datatables which it returns as a dataset. 
            DataSet dataForOnePage = ScrapeData(url);
            // Add data for one page to temp bag
            bag.Add(dataForOnePage);
        });
    
        //merge each table in dataForOnePage into allData from bag
    
        PushAllDataToSql(alldata); 
    

    【讨论】:

    • 这是暴力破解。您可以这样做,但同时您浪费了等待固有异步操作的线程(Parallel 将产生线程来处理the8000urls 的分区,然后这些线程将在获取 url 时阻塞)。你不需要需要async/await,但它肯定更优雅,可以更好地利用你拥有的资源。
    • 就是这样。它是控制台应用程序,应该更快。使用async/await,您仍然会一次下载一个网址,这是不可接受的。使用Parallel.ForEach 可以一次下载更多的网址,从而提高整体应用程序性能。而这正是 user1308743 所需要的。
    • 这不是真的。对于async/await,它们不会一次加载一个,它们是异步启动的,最后都在等待。您对 async/await 所做的事情的解释是不正确的。
    • 嗯,第一次阅读您的帖子时,我似乎没有注意到List&lt;Task&gt;。返回任务并在方法体中结合async/await等待,绝对是最佳选择。
    猜你喜欢
    • 2012-05-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-22
    相关资源
    最近更新 更多