【问题标题】:Parallel Mulit-threaded Downloads using async-await使用 async-await 并行多线程下载
【发布时间】:2022-01-06 02:42:13
【问题描述】:

我在我的 Windows 服务 - C# 中有 100 多个要从 Web 下载的大文件。要求是一次维护 - 最多 4 个并行 Web 文件下载。

我可以使用异步等待实现并发/并行下载,还是必须使用BackgroundWorker 进程或线程? async-await 是多线程的吗? 请参阅下面使用 async-await 的示例程序:

 static int i = 0;
 
 Timer_tick() {
   while (i < 4) {
     i++;
     model = GetNextModel();
     await Download(model);
   }
 }
 
 private async Download(XYZ model) {
   Task<FilesetResult> t = DoWork(model);
   result = await t;
   //Use Result
 }
 
 private async Task<FilesetResult> Work(XYZ model) {
   fileresult = await api.Download(model.path)
   i--;
   return filesetresult;
 }

【问题讨论】:

  • 是的,但您不必逐个await 下载,而是启动您想要的号码并使用Task.WaitAll,然后再继续下一批。但它不是多线程的。它只是允许 IO 并行发生,但所有 CPU 绑定都将由一个线程处理。如果您也有 CPU 密集型代码,那么您也需要一个并行解决方案。
  • 如果IO并行发生但只使用一个线程,使用多个线程会提高性能吗?
  • 完全看你除了IO还做了什么。

标签: c# multithreading async-await


【解决方案1】:

您可以使用SemaphoreSlim 类限制并行运行的异步任务的数量。比如:

List<DownloadRequest> requests = Enumerable.Range(0, 100).Select(x => new DownloadRequest()).ToList();
using (var throttler = new SemaphoreSlim(4))
{
    Task<DownloadResult>[] downloadTasks = requests.Select(request => Task.Run(async () =>
    {
        await throttler.WaitAsync();
        try
        {
            return await DownloadTaskAsync(request);
        }
        finally
        {
            throttler.Release();
        }
    })).ToArray();
    await Task.WhenAll(downloadTasks);
}

更新:感谢 cmets,已修复问题。

Update2:动态请求列表的示例解决方案

public class DownloadManager : IDisposable
{
    private readonly SemaphoreSlim _throttler = new SemaphoreSlim(4);

    public async Task<DownloadResult> DownloadAsync(DownloadRequest request)
    {
        await _throttler.WaitAsync();
        try
        {
            return await api.Download(request);
        }
        finally
        {
            _throttler.Release();
        }
    }

    public void Dispose()
    {
        _throttler?.Dispose();
    }
}

【讨论】:

  • 我的下载请求列表是动态的,它会随着时间的推移而增加或减少
  • Task.Run 返回的 Task 实例没有被等待,这意味着 finally 将立即被命中(很可能在 DownloadTaskAsync 仍在进行时)并且信号量将被释放。如果没有将返回的任务添加到列表中并在某个时候等待,这个解决方案是纯粹的开销 - 它没有做任何有意义的事情
  • @Gags 添加了动态请求的解决方案,当多个客户端可能调用下载方法时,但有些会等到其他客户端完成。
  • @Gags 是的,第一个任务释放信号量后第 5 个将继续
  • @gags 使用 .Wait() 有效地意味着阻塞调用者线程。如果你想并行使用,你应该有 async/await 调用者,或者你需要启动多个任务并一次等待它们(第一个代码示例)
【解决方案2】:

手工操作似乎非常复杂。

var files = new List<Uri>();

Parallel.ForEach(files, 
                 new ParallelOptions { MaxDegreeOfParallelism = 4 },
                 this.Download);

现在你只需要一个单一的、正常的、同步的方法private void Download(Uri file),你就可以开始了。

如果您需要生产者/消费者模式,最简单的版本可能是BlockingCollection

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp11
{
    internal class Program
    {
        internal static void Main()
        {
            using (var queue = new BlockingCollection<Uri>())
            {
                // starting the producer task:
                Task.Factory.StartNew(() =>
                {
                    for (int i = 0; i < 100; i++)
                    {
                        // faking read from message queue... we get a new Uri every 100 ms
                        queue.Add(new Uri("http://www.example.com/" + i));

                        Thread.Sleep(100);
                    }

                    // just to end this program... you don't need to end this, just listen to your message queue
                    queue.CompleteAdding();
                });

                // run the consumers:
                Parallel.ForEach(queue.GetConsumingEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = 4 }, Download);
            }
        }

        internal static void Download(Uri uri)
        {
            // download your file here

            Console.WriteLine($"Downloading {uri} [..        ]");
            Thread.Sleep(1000);
            Console.WriteLine($"Downloading {uri} [.....     ]");
            Thread.Sleep(1000);
            Console.WriteLine($"Downloading {uri} [.......   ]");
            Thread.Sleep(1000);
            Console.WriteLine($"Downloading {uri} [......... ]");
            Thread.Sleep(1000);
            Console.WriteLine($"Downloading {uri} [..........]");
            Thread.Sleep(1000);
            Console.WriteLine($"Downloading {uri} OK");
        }
    }
}

【讨论】:

  • 我的列表 在消息队列中不断增加。说当 Parallel.ForEach 开始时它的大小是 3。当下载运行时,又有 3 个请求排队/添加到列表
  • 你可以用BlockingCollection&lt;&gt;代替List&lt;&gt;
猜你喜欢
  • 2012-09-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多