【问题标题】:Parallel.ForEach Loop with Retry Logic in C#C# 中带有重试逻辑的 Parallel.ForEach 循环
【发布时间】:2020-04-06 07:40:22
【问题描述】:

我正在使用 Parallel.ForEach 将 C# 中的多个文件从谷歌存储桶下载到文件夹位置。我正在使用重试逻辑,因此它可以在下载过程中文件下载失败的情况下重试下载文件。如何为Parallel.ForEach 循环中的每个文件或每个线程应用重试逻辑。

Parallel.ForEach(listFiles, objectName =>            
{
    retryCount = 0;                        
    countOfFiles++;
    downloadSuccess = false;
    bucketFileName = Path.GetFileName(objectName.Name);
    guidFolderPath = tempFolderLocation + "\\" + bucketFileName;

    while (retryCount < retryCountInput && downloadSuccess == false)
    {
        try
        {
            FileStream fs = new FileStream(guidFolderPath, FileMode.Create, FileAccess.Write, FileShare.Write);
            using (fs)
            {                                               
                storage.DownloadObjectAsync(bucketName, objectName.Name, fs, option, cancellationToken, progress).Wait();
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("Exception occured while downloading file: " + ex.ToString());                   
            Thread.Sleep(RetryInterval(retryCount, minBackoffTimeSpan, maxBackoffTimeSpan, deltaBackoffTimeSpan));
            retryCount++;

        }
    }
}

【问题讨论】:

  • 看来您已经创建了自己的重试机制。不行吗?
  • 如果您在下载后将 downloadSuccess 设置为 true,您的代码可能会工作
  • 您不应该将Parallel.ForEach() 用于IO 绑定工作。创建一个任务列表并await Task.WhenAll(list)他们。 Thread.Sleep( 做到了这一切。您正在阻塞线程池线程。
  • 跨线程共享bucketFileName 感觉就像是灾难的秘诀。

标签: c# concurrency parallel.foreach retry-logic


【解决方案1】:

我会将其更改为任务并使用异步。这样你的 Thread.Sleep 就不会阻塞线程池线程。 Parallel.ForEach 用于 CPU 密集型工作。

类似:(如果没有你的其余代码,我无法编译/测试它)

int retryCountInput = 5;
var tasks = new List<Task>();

foreach (var file in listFiles)
{
    var task = Task.Run(async () =>
    {
        // make it local
        int retryCount = 0;
        string bucketFileName = Path.GetFileName(objectName.Name);
        string guidFolderPath = tempFolderLocation + "\\" + bucketFileName;

        while (retryCount < retryCountInput)
        {
            try
            {
                using (var fs = new FileStream(guidFolderPath, FileMode.Create, FileAccess.Write, FileShare.Write))
                    // Use await here, instead of `Wait()` so this threadpool thread
                    // can be used for other tasks.
                    await storage.DownloadObjectAsync(bucketName, objectName.Name, fs, option, cancellationToken, progress);

                break;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception occured while downloading file: " + ex.ToString());

                // Use Task.Delay here, so this thread is 'released'
                await Task.Delay(RetryInterval(retryCount, minBackoffTimeSpan, maxBackoffTimeSpan, deltaBackoffTimeSpan));
                retryCount++;
            }
        }
    });
    tasks.Add(task);
}
await Task.WhenAll(tasks);

【讨论】:

  • 嗨@Jeroen,我已经修改了我的代码并删除了parallel.foreach,而不是使用foreach 循环来遍历文件。但是现在,我无法在下载路径中找到所有文件。下载路径中的下载文件没有更改,这种行为似乎是随机的。我可以使用 Task.run 进行 IO 操作吗?
【解决方案2】:

我已经修改了我的代码并删除了Parallel.ForEach,而不是使用foreach 循环来遍历文件。但是现在,我无法在下载路径中找到所有文件,尽管日志显示所有文件都已下载。下载路径中下载文件的数量发生了变化,这种行为似乎是随机的。我可以使用Task.Run 进行 I/O 操作吗?

var tasks = new List<Task>();
foreach (var objectName in listFiles)
{
    var task = Task.Run(() =>
    {
        downloadSuccess = false;
        bucketFileName = Path.GetFileName(objectName.Name);
        guidFolderPath = tempFolderLocation + "\\" + bucketFileName;

        var maxRetryAttempts = 3;
        var pauseBetweenFailures = TimeSpan.FromSeconds(2);
        RetryHelper.RetryOnException(maxRetryAttempts, pauseBetweenFailures, async () =>
        {
            FileStream fs = new FileStream(guidFolderPath, FileMode.Create,
                FileAccess.Write, FileShare.Write);
            using (fs)
            {
                var progress = new Progress<IDownloadProgress>(
                    p =>
                    {
                        DownloadProgress(p, retryCount, objectName.Name);
                    });

                await client.DownloadObjectAsync(bucketName, objectName.Name,
                    fs, option, cancellationToken.Token, progress);
            }
        });
    });
    tasks.Add(task);
}
await Task.WhenAll(tasks);

【讨论】:

  • 是的,您当然可以使用Task.Run 进行I/O 操作。在这种情况下,推荐的模式是将异步委托作为参数传递,就像在 Jeroen van Langen 的 answer 中所做的那样。您的代码中不清楚的是 RetryHelper.RetryOnException 方法的签名。如果它返回一个Task,那么你应该await这个任务,否则它将以即发即弃的方式运行。
  • RetryHelper.RetryOnException 不返回任务。它有 void 返回类型。
  • 它的第三个参数呢?它是Func&lt;Task&gt; 类型吗?如果没有,你有一个async void
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2011-09-30
  • 1970-01-01
  • 2018-11-15
  • 1970-01-01
  • 2011-06-05
  • 1970-01-01
  • 2012-02-29
相关资源
最近更新 更多