【问题标题】:Best practice for task/await in a foreach loopforeach 循环中任务/等待的最佳实践
【发布时间】:2015-07-30 03:14:16
【问题描述】:

我在使用任务/等待的 foreach 中有一些耗时的代码。 它包括从数据库中提取数据、生成 html、将其发布到 API 以及将回复保存到数据库。

模型看起来像这样

List<label> labels = db.labels.ToList();
foreach (var x in list) 
{
    var myLabels = labels.Where(q => !db.filter.Where(y => x.userid ==y.userid))
                         .Select(y => y.ID)
                         .Contains(q.id))

    //Render the HTML
    //do some fast stuff with objects

    List<response> res = await api.sendMessage(object);  //POST

    //put all the responses in the db
    foreach (var r in res) 
    {
        db.responses.add(r);
    }

    db.SaveChanges();
}

在时间方面,生成 Html 并将其发布到 API 似乎需要花费大部分时间。

理想情况下,如果我可以为下一个项目生成 HTML,然后等待帖子完成,然后再发布下一个项目,那就太好了。

也欢迎其他想法。 怎么办?

我首先想到在foreach 上方添加一个Task 并等待它完成后再进行下一个 POST,但是我如何处理最后一个循环...感觉很乱...

【问题讨论】:

  • 问:帖子真的必须按顺序发送吗? (整个有序部分正在序列化执行,否则有人可能会建议 Task.WaitAll 或类似 Parallel.For 的东西,这将允许并行执行。)
  • @user1778606 我不确定我是否要同时向 API 发送 1000 个 POST?顺序无所谓,但每次POST成功后,我需要尽快将其回复保存到数据库中。
  • @Stefanvds 您是否可以同时发出Post 请求并保存到数据库?
  • @YuvalItzchakov 我需要尽快将 POST 的响应保存在数据库中。我将如何同时做这两个? :)
  • 关于异步任务的并行方法的相当好的讨论stackoverflow.com/questions/11564506/…

标签: c# asp.net-mvc optimization async-await


【解决方案1】:

您可以并行执行,但每个任务需要不同的上下文。

实体框架不是线程安全的,所以如果你不能在并行任务中使用一个上下文。

var tasks = myLabels.Select( async label=>{
    using(var db = new MyDbContext ()){
        // do processing...
        var response = await api.getresponse();
        db.Responses.Add(response);
        await db.SaveChangesAsync();
    } 
});

await Tasks.WhenAll(tasks);

在这种情况下,所有任务看起来都是并行运行的,每个任务都有自己的上下文。

如果你不为每个任务创建新的上下文,你会得到这个问题上提到的错误Does Entity Framework support parallel async queries?

【讨论】:

    【解决方案2】:

    这里更像是架构问题而不是代码问题,imo。

    你可以把你的工作分成两个独立的部分:

    1. 从数据库中获取数据并生成 HTML
    2. 发送 API 请求并将响应保存到数据库

    您可以并行运行它们,并使用队列来协调:每当您的 HTML 准备好时,它就会被添加到队列中,另一个工作人员从那里继续,获取该 HTML 并发送到 API。

    这两个部分也可以以多线程方式完成,例如您可以通过让一组工作人员在队列中查找要处理的项目来同时处理队列中的多个项目。

    【讨论】:

      【解决方案3】:

      这为生产者/消费者模式尖叫:一个生产者以不同于消费者消费数据的速度生产数据。一旦生产者不再生产任何东西,它就会通知消费者不再需要任何数据。

      MSDN 有一个很好的例子,其中几个数据流块链接在一起:一个块的输出是另一个块的输入。

      Walkthrough: Creating a Dataflow Pipeline

      思路如下:

      • 创建一个将生成 HTML 的类。
      • 这个类有一个System.Threading.Tasks.Dataflow.BufferBlock&lt;T类的对象>
      • 异步过程创建所有 HTML 输出并等待 SendAsync 将数据发送到 bufferBlock
      • 缓冲块实现接口 ISourceBlock&lt;T>。该类将其公开为一个 get 属性:

      代码:

      class MyProducer<T>
      {
          private System.Threading.Tasks.Dataflow.BufferBlock<T> bufferBlock = new BufferBlock<T>();
      
          public ISourceBlock<T> Output {get {return this.bufferBlock;}
      
          public async ProcessAsync()
          {
              while (somethingToProduce)
              {
                  T producedData = ProduceOutput(...)
                  await this.bufferBlock.SendAsync(producedData);
              }
              // no date to send anymore. Mark the output complete:
              this.bufferBlock.Complete()
          }
      }
      
      • 第二个类采用此 ISourceBlock。它将在此源块处等待,直到数据到达并对其进行处理。
      • 在异步函数中执行此操作
      • 当没有更多数据可用时停止

      代码:

      public class MyConsumer<T>
      {
          ISourceBlock<T> Source {get; set;}
          public async Task ProcessAsync()
          {
              while (await this.Source.OutputAvailableAsync())
              {   // there is input of type T, read it:
                  var input = await this.Source.ReceiveAsync();
                  // process input
              }
              // if here, no more input expected. finish.
          }
      }
      

      现在把它放在一起:

      private async Task ProduceOutput<T>()
      {
          var producer = new MyProducer<T>();
          var consumer = new MyConsumer<T>() {Source = producer.Output};
          var producerTask = Task.Run( () => producer.ProcessAsync());
          var consumerTask = Task.Run( () => consumer.ProcessAsync());
          // while both tasks are working you can do other things.
          // wait until both tasks are finished:
          await Task.WhenAll(new Task[] {producerTask, consumerTask});
      }
      

      为简单起见,我省略了异常处理和取消。 StackOverFlow 有关于异常处理和任务取消的文章:

      【讨论】:

      • 感谢您的意见。这听起来不错,但我猜这对于这种情况来说有点矫枉过正,因为 10k html 的生成只需要大约 5 秒。这意味着整个过程只会节省那么多秒。自从与 API 交谈后,我喜欢这样一个事实,即我可以轻松地选择可以创建多少个并行 POST(在我的解决方案中)
      • 不客气。对我来说,创建这样的东西是一个很好的练习,特别是当我的生产者将下载的文件发送给必须解释它们的消费者,并将转换后的数据发送给另一个消费者,同时将原始文件发送给另一个消费者等时。这很好以 Windows 形式查看所有进程都在按照自己的节奏进行合作,而无需我进行任何安排。
      • 我没有使用更大的数据集,我正在考虑使用这种方法。在您的情况下,我假设您的任务创建者是两者中较慢的?对我来说,处理/消费者将是两者中较慢的。这里的问题是,对于 1 个任务的“创建者”,我必须有 5 个并行消费者。有什么指导方针吗?
      【解决方案4】:

      这就是我最终使用的:(https://stackoverflow.com/a/25877042/275990)

      List<ToSend> sendToAPI = new List<ToSend>();
      List<label> labels = db.labels.ToList();
      foreach (var x in list) {
          var myLabels = labels.Where(q => !db.filter.Where(y => x.userid ==y.userid))
                               .Select(y => y.ID)
                               .Contains(q.id))
      
          //Render the HTML
          //do some fast stuff with objects
          sendToAPI.add(the object with HTML);
      }
      
      int maxParallelPOSTs=5;
      await TaskHelper.ForEachAsync(sendToAPI, maxParallelPOSTs, async i => {
          using (NasContext db2 = new NasContext()) {
              List<response> res = await api.sendMessage(i.object);  //POST
      
              //put all the responses in the db
              foreach (var r in res) 
              {
                  db2.responses.add(r);
              }
      
              db2.SaveChanges();
          }
      });
      
      
      
      
      
          public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) {
              return Task.WhenAll(
                  from partition in Partitioner.Create(source).GetPartitions(dop)
                  select Task.Run(async delegate {
                      using (partition)
                          while (partition.MoveNext()) {
                              await body(partition.Current).ContinueWith(t => {
                                  if (t.Exception != null) {
                                      string problem = t.Exception.ToString();
                                  }
                                  //observe exceptions
                              });
      
                          }
                  }));
          }
      

      基本上让我生成 HTML 同步,这很好,因为生成 1000 只需要几秒钟,但让我发布并保存到数据库异步,线程数与我预定义的一样多。在这种情况下,我将发布到 Mandrill API,并行发布没有问题。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2013-02-02
        • 1970-01-01
        • 1970-01-01
        • 2020-05-09
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多