【问题标题】:How to maximize process throughput (C#)?如何最大化进程吞吐量(C#)?
【发布时间】:2018-10-30 11:04:13
【问题描述】:

我想以最大吞吐量处理一些文件。文件的路径保存在数据库中。我需要从数据库中获取文件路径,将它们的状态更改为正在处理,处理它们,然后将它们的状态更改为已完成或失败。

目前,我分批(100 个文件)获取文件,以减少完成的查询数量并并行处理它们(并行度为 10)。但是通过这种方式,我在批次结束时失去了吞吐量。当批处理中剩余的文件少于 10 个时,并行度不再是 10,它会降低。

这是我所拥有的:

private async Task CopyPendingFilesAsync(SourcePath sourcePath, Options options)
{
    var batchIndex = 0;
    while (true)
    {
        var fileBatch = _sourceFileService.GetSourceFileBatchBySourcePathId(
            sourcePath.Id, _dataSourceExportConfig.FileCopyBatchSize, Status.Pending);
        if (fileBatch.Count == 0)
            return;

        await SetInProgressStatusForBatch(fileBatch)
            .ConfigureAwait(false);

        fileBatch
            .AsParallel()
            .WithDegreeOfParallelism(_dataSourceExportConfig.FileCopyDegreeOfParallelism)
            .ForAll(file => ProcessFile(file, destinationBase, options));

        await _sourceFileService
            .UpdateSourceFilesStatusAsync(fileBatch)
            .ConfigureAwait(false);

        batchIndex++;
    }
}

private async Task SetInProgressStatusForBatch(IEnumerable<SourceFile> fileBatch)
{
    foreach (var file in fileBatch)
        file.Status = Status.InProgress;

    await _sourceFileService
        .UpdateSourceFilesStatusAsync(fileBatch)
        .ConfigureAwait(false);
}

private void ProcessFile(
    SourceFile file,
    string destinationBase,
    Options options)
{
    try
    {
        //do something ...

        file.Status = Status.Success;
        file.ExceptionMessage = null;
    }
    catch (Exception ex)
    {
        _logger.Error(ex);
        file.Status = Status.Failed;
        file.ExceptionMessage = ex.Message;
    }
}

如何最大限度地提高吞吐量?我阅读了有关 BlockingCollection、TPL 数据流和 Rx 的生产者-消费者模式,我很确定我想要实现的目标可以通过上述任何方法实现,但到目前为止我还不能做到。使用生产者-消费者模式,与消费者相比,我的生产者速度非常快,使用 TPL 数据流时,我被 BatchBlock 卡住了,我还没有尝试过 Rx。有人可以指出我正确的方向吗?

更新: 这是一个最小、完整且可验证的示例:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;

namespace ConsoleApp1
{
    internal static class Program
    {
        private static void Main()
        {
            Console.WriteLine("Processing files");

            var stopWatch = new Stopwatch();
            stopWatch.Start();

            var fileService = new FileService();
            fileService.ProcessPendingFiles();

            foreach (var sourceFile in fileService.SourceFiles)
            {
                Console.WriteLine($"{sourceFile.Id} {sourceFile.Status}");
            }

            Console.WriteLine(stopWatch.Elapsed);

            Console.ReadLine();
        }
    }

    public class FileService
    {
        private const int BatchSize = 100;
        private const int DegreeOfParallelism = 10;
        //this SourceFiles property replaces the Sqlite database where the data is actually stored
        public ICollection<SourceFile> SourceFiles =
            Enumerable
                .Range(0, 1000)
                .Select(i =>
                    new SourceFile
                    {
                        Id = i,
                        Path = "source file path",
                        Status = Status.Pending,
                    })
                .ToList();

        public void ProcessPendingFiles()
        {
            while (true)
            {
                var fileBatch = GetSourceFileBatch(BatchSize, Status.Pending);
                if (fileBatch.Count == 0)
                    return;

                SetInProgressStatusForBatch(fileBatch);

                fileBatch
                    .AsParallel()
                    .WithDegreeOfParallelism(DegreeOfParallelism)
                    .ForAll(ProcessFile);

                UpdateSourceFiles(fileBatch);
            }
        }

        private ICollection<SourceFile> GetSourceFileBatch(int batchSize, Status status)
            => SourceFiles
                .Where(sf => sf.Status == status)
                .Take(batchSize)
                .ToList();

        //set status to in progress for all files in the batch
        //and save the changes to database
        //in the application this is actually done with a bulk update and the method is async
        private void SetInProgressStatusForBatch(IEnumerable<SourceFile> fileBatch)
        {
            foreach (var file in fileBatch)
            {
                file.Status = Status.InProgress;

                var sourceFile = SourceFiles.First(sf => sf.Id == file.Id);
                sourceFile.Status = file.Status;
            }
        }

        //set status and exception messages for all files in the batch
        //and save the changes to database
        //in the application this is actually done with a bulk update and the method is async
        private void UpdateSourceFiles(IEnumerable<SourceFile> fileBatch)
        {
            foreach (var file in fileBatch)
            {
                var sourceFile = SourceFiles.First(sf => sf.Id == file.Id);
                sourceFile.Status = file.Status;
                sourceFile.ExceptionMessage = file.ExceptionMessage;
            }
        }

        private void ProcessFile(SourceFile file)
        {
            try
            {
                //do something ...
                Thread.Sleep(20);

                file.Status = Status.Success;
                file.ExceptionMessage = null;
            }
            catch (Exception ex)
            {
                file.Status = Status.Failed;
                file.ExceptionMessage = ex.Message;
            }
        }
    }

    public class SourceFile
    {
        public int Id { get; set; }

        public string Path { get; set; }

        public Status Status { get; set; }

        public string ExceptionMessage { get; set; }
    }

    public enum Status
    {
        Pending,

        InProgress,

        Success,

        Failed,
    }
}

【问题讨论】:

  • 1) 如果你有一个文件名来源,那么你可以有 n 个工作人员,每个工作人员从来源中获取一个文件名,处理它,然后重复直到没有更多文件名。 2) 根据处理文件所需的内容,您可能会通过调整所使用的任何磁盘 I/O 缓冲区的大小来获得更显着的改进。 3) 确保一次只处理一个文件时它不会以最快的速度运行。
  • 只需写一些关于你在做什么的 sudo “词”,就像一个流程.. step1.. step 2...很难判断你是否正在与 ProcessFile 中的数据库进行交互,因此很难说出瓶颈在哪里,就像很难说出入口点
  • @Seabizkit 我没有与 ProcessFile 中的数据库进行交互。
  • 可以提供minimal reproducible example吗?
  • @MariusStănescu - 您可以将 SelectMany 更改为 Select + Merge 以便能够指定并行度。

标签: c# performance system.reactive tpl-dataflow blockingcollection


【解决方案1】:

我知道你可能会讨厌这个答案,但最终,这取决于......

我不完全确定这些文件是什么、它们位于何处或处理它们意味着什么。我的回答假设您对当前的峰值处理感到满意,您只需要一种更好的方法来确保您在这里获得一致的性能并且它不会下降到操作的尾部。我会尽量坚持回答您更直接的问题,即使用带有 BlockingCollection 的生产者-消费者模式,而不是改变整个方法。

我认为您确实理解速度下降的原因,但您不确定如何处理此问题,因为您仅在当前批次完成时才获取下一批项目。 (不用说,这可能是使用消息队列而不是 SQL 的一个很好的例子,但这是一个有些单独的讨论,可以避免您的主要问题。)

这已经在以下问题上得到了相当详细的回答:

classic producer consumer pattern using blockingcollection and tasks .net 4 TPL

public class YourCode
{
  private BlockingCollection<object> queue = new BlockingCollection<object>();

  public YourCode()
  {
    var thread = new Thread(StartConsuming);
    thread.IsBackground = true;
    thread.Start();
  }

  public void Produce(object item)
  {
    queue.Add(item);
  }

  private void StartConsuming()
  {
    while (true)
    {
      object item = queue.Take();
      // Add your code to process the item here.
      // Do not start another task or thread. 
    }
  }
}

然后你可以有多个消费者和一个生产者(因为你确实指出你的生产速度比你消耗的快得多)

【讨论】:

    【解决方案2】:

    正如您提到的,此操作当然可以使用 TPL-Dataflow 完成,但很难知道您是否真的会看到任何吞吐量增益。对于任何性能指标,您能做的最好的事情就是尝试不同的方法并衡量结果。

    此示例包含用于调整数据流行为的最相关选项,以便您进行试验。该结构大致基于您的示例代码,并带有一些假设。

    • 一个SourcePath 产生一批SourceFile
    • 更新SourceFile 状态是异步的
    • 处理SourceFile 是同步的

    示例:

    public class ProcessFilesFlow
    {
        private TransformBlock<SourcePath, IEnumerable<SourceFile>> _getSourceFileBatch;
        private TransformBlock<IEnumerable<SourceFile>, IEnumerable<SourceFile>> _setStatusToProcessing;
        private TransformBlock<IEnumerable<SourceFile>, IEnumerable<SourceFile>> _processFiles;
        private ActionBlock<IEnumerable<SourceFile>> _setStatusToComplete;
    
        public ProcessFilesFlow()
        {
            //Setup options
            //All of these options and more can be tuned for throughput
            var getSourceFileBatchOptions = new ExecutionDataflowBlockOptions()
            {
                BoundedCapacity = 10, //How many source paths to queue at one time
                MaxDegreeOfParallelism = 10, //How many source paths to get batches for at one time
                EnsureOrdered = false //Process batches as soon as ready
            };
            var setStatusToProcessingOptions = new ExecutionDataflowBlockOptions()
            {
                BoundedCapacity = 10, //How many batches to queue at one time
                MaxDegreeOfParallelism = 10, //Unlimited, how many batches to updates status for
                EnsureOrdered = false //Process batches as soon as ready
            };
            var processFilesOptions = new ExecutionDataflowBlockOptions()
            {
                BoundedCapacity = 10, //Batches to queue at one time
                MaxDegreeOfParallelism = 10, //Batches to work on at the same time
                EnsureOrdered = false //Process batches as soon as ready
            };
            var setStatusToCompleteOptions = new ExecutionDataflowBlockOptions()
            {
                BoundedCapacity = 10, //Batches to queue at one time
                MaxDegreeOfParallelism = 10, //Batches to update at once
                EnsureOrdered = false //Process batches as soon as ready
            };
    
            //Build the dataflow pipeline
            _getSourceFileBatch = new TransformBlock<SourcePath, IEnumerable<SourceFile>>(path => GetSourceFileBatch(path), getSourceFileBatchOptions);
            _setStatusToProcessing = new TransformBlock<IEnumerable<SourceFile>, IEnumerable<SourceFile>>(batch => SetStatusToProcessingAsync(batch), setStatusToProcessingOptions);
            _processFiles = new TransformBlock<IEnumerable<SourceFile>, IEnumerable<SourceFile>>(batch => ProcessFiles(batch), processFilesOptions);
            _setStatusToComplete = new ActionBlock<IEnumerable<SourceFile>>(batch => SetStatusToCompleteAsync(batch), setStatusToCompleteOptions);
    
            //Link the pipeline
            _getSourceFileBatch.LinkTo(_setStatusToProcessing, new DataflowLinkOptions() { PropagateCompletion = true });
            _setStatusToProcessing.LinkTo(_processFiles, new DataflowLinkOptions() { PropagateCompletion = true });
            _processFiles.LinkTo(_setStatusToComplete, new DataflowLinkOptions() { PropagateCompletion = true });
        }
    
        public async Task ProcessAll(IEnumerable<SourcePath> sourcePaths)
        {
            foreach(var path in sourcePaths)
            {
                await _getSourceFileBatch.SendAsync(path);
            }
            _getSourceFileBatch.Complete();
            await _setStatusToComplete.Completion;
        }
    
        private IEnumerable<SourceFile> GetSourceFileBatch(SourcePath sourcePath)
        {
            //Get batch of files based on sourcePath
            return Enumerable.Empty<SourceFile>();
        }
    
        private async Task<IEnumerable<SourceFile>> SetStatusToProcessingAsync(IEnumerable<SourceFile> sourceFiles)
        {
            //Update file status
            foreach (var file in sourceFiles)
                await file.UpdateStatusAsync("In Progress");
            return sourceFiles;
        }
    
        private IEnumerable<SourceFile> ProcessFiles(IEnumerable<SourceFile> sourceFiles)
        {
            //process files
            foreach (var file in sourceFiles)
                file.Process();
            return sourceFiles;
        }
    
        private async Task SetStatusToCompleteAsync(IEnumerable<SourceFile> sourceFiles)
        {
            //Update file status
            foreach (var file in sourceFiles)
                await file.UpdateStatusAsync("Completed");
        }
    }
    

    还提供其他选项,例如使用TransformManyBlock 拆分批处理以及并行处理批处理中的单个文件。

    【讨论】:

    • 这些块应该做什么?在任何情况下,不要使用一次处理/发出整个 IEnumerable 的 TransformBlock 来处理批量文件,而是使用 TransformManyBlock 将每个项目发送到下一个块,从而允许多个块并行工作。否则,这段代码并不比一次调用每个函数更好
    【解决方案3】:

    工作者模式应该为您简化事情,并确保您始终并行处理一致数量的工作单元。

    例如,如果您预先创建 10 个任务,并允许它们执行新任务直到没有任务为止,您不再依赖于等待整批线程或任务全部完成后再开始。

    class WorkController
    {
        private DataSourceExportConfig _dataSourceExportConfig;
        private SourceFileService _sourceFileService;
        private string destinationBase;
    
        public async Task CopyPendingFilesAsync(SourcePath sourcePath, Options options)
        {
            await Task.WhenAll(Enumerable.Range(0, 10).Select(x => Worker(sourcePath, options)));
        }
    
        public async Task Worker(SourcePath sourcePath, Options options)
        {
            SourceFile file = null;
    
            while (_sourceFileService.GetNextFile(out file))
            {
                ProcessFile(file, destinationBase, options);
            }
        }
    
        private void ProcessFile(SourceFile file, string destinationBase, Options options)
        {
        }
    }
    

    【讨论】:

    • 这就是数据流块已经做的,增加了排队,背压支持
    【解决方案4】:

    这是磁盘操作。并行化在这些方面效果不佳。磁盘具有物理上有限的吞吐量。用请求轰炸它只会导致整个计算增加寻道时间。存在像 NCQ 这样的功能会尝试减轻这种影响,但它们有限制。

    对于网络,至少并行化可以产生一些影响:

    • 在一个请求处于“协议开销”阶段时使用媒介
    • 绕过可能存在的“每个连接”限制

    但即便如此,也有严格的限制。

    实现快速磁盘操作的最佳方法是不要使用糟糕的后端磁盘。即,不使用旋转磁盘。或者至少将它们组织成 Raid 0 或类似的结构。

    【讨论】:

    • 感谢您的回答。虽然它没有回答我的问题。想象一下,我不是在处理文件,而是在处理一些随机数据实体。 (实际上,文件在不同的网络共享上)
    • @MariusStănescu:网络共享不太可能有速率限制。所以限制是网络传输速率或磁盘 I/O,以较低者为准。无论哪种情况,这都不是编程问题。加载数据将是 8/10 情况下的核心限制因素。
    • 每个连接可能存在网络限制。例如,即使网络可能是 1Gbps,一个连接也可能被限制为 1Mbps。
    • @MariusStănescu:Windows 共享上的每个连接限制?不确定共享是否甚至具有该功能。或者甚至有超过 1 个连接/用户。当然,这不是我的专业领域。
    • 我们离题了。我对提高流程吞吐量的方法很感兴趣,只是为了学习如何做得更好。正如我之前所说,您可以想象我正在处理的事情是一个随机实体,它执行 CPU 密集型工作并且与 I/O 完全无关。所以,问题不在于 I/O 与 CPU。但我很感激你注意到了这一点,我非常感谢你的意见。也许我会就这个话题提出一个不同的问题,然后我们可以在那里进行讨论。 :)
    猜你喜欢
    • 1970-01-01
    • 2011-11-24
    • 1970-01-01
    • 2023-03-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多