【发布时间】:2018-10-30 11:04:13
【问题描述】:
我想以最大吞吐量处理一些文件。文件的路径保存在数据库中。我需要从数据库中获取文件路径,将它们的状态更改为正在处理,处理它们,然后将它们的状态更改为已完成或失败。
目前,我分批(100 个文件)获取文件,以减少完成的查询数量并并行处理它们(并行度为 10)。但是通过这种方式,我在批次结束时失去了吞吐量。当批处理中剩余的文件少于 10 个时,并行度不再是 10,它会降低。
这是我所拥有的:
private async Task CopyPendingFilesAsync(SourcePath sourcePath, Options options)
{
var batchIndex = 0;
while (true)
{
var fileBatch = _sourceFileService.GetSourceFileBatchBySourcePathId(
sourcePath.Id, _dataSourceExportConfig.FileCopyBatchSize, Status.Pending);
if (fileBatch.Count == 0)
return;
await SetInProgressStatusForBatch(fileBatch)
.ConfigureAwait(false);
fileBatch
.AsParallel()
.WithDegreeOfParallelism(_dataSourceExportConfig.FileCopyDegreeOfParallelism)
.ForAll(file => ProcessFile(file, destinationBase, options));
await _sourceFileService
.UpdateSourceFilesStatusAsync(fileBatch)
.ConfigureAwait(false);
batchIndex++;
}
}
private async Task SetInProgressStatusForBatch(IEnumerable<SourceFile> fileBatch)
{
foreach (var file in fileBatch)
file.Status = Status.InProgress;
await _sourceFileService
.UpdateSourceFilesStatusAsync(fileBatch)
.ConfigureAwait(false);
}
private void ProcessFile(
SourceFile file,
string destinationBase,
Options options)
{
try
{
//do something ...
file.Status = Status.Success;
file.ExceptionMessage = null;
}
catch (Exception ex)
{
_logger.Error(ex);
file.Status = Status.Failed;
file.ExceptionMessage = ex.Message;
}
}
如何最大限度地提高吞吐量?我阅读了有关 BlockingCollection、TPL 数据流和 Rx 的生产者-消费者模式,我很确定我想要实现的目标可以通过上述任何方法实现,但到目前为止我还不能做到。使用生产者-消费者模式,与消费者相比,我的生产者速度非常快,使用 TPL 数据流时,我被 BatchBlock 卡住了,我还没有尝试过 Rx。有人可以指出我正确的方向吗?
更新: 这是一个最小、完整且可验证的示例:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
namespace ConsoleApp1
{
internal static class Program
{
private static void Main()
{
Console.WriteLine("Processing files");
var stopWatch = new Stopwatch();
stopWatch.Start();
var fileService = new FileService();
fileService.ProcessPendingFiles();
foreach (var sourceFile in fileService.SourceFiles)
{
Console.WriteLine($"{sourceFile.Id} {sourceFile.Status}");
}
Console.WriteLine(stopWatch.Elapsed);
Console.ReadLine();
}
}
public class FileService
{
private const int BatchSize = 100;
private const int DegreeOfParallelism = 10;
//this SourceFiles property replaces the Sqlite database where the data is actually stored
public ICollection<SourceFile> SourceFiles =
Enumerable
.Range(0, 1000)
.Select(i =>
new SourceFile
{
Id = i,
Path = "source file path",
Status = Status.Pending,
})
.ToList();
public void ProcessPendingFiles()
{
while (true)
{
var fileBatch = GetSourceFileBatch(BatchSize, Status.Pending);
if (fileBatch.Count == 0)
return;
SetInProgressStatusForBatch(fileBatch);
fileBatch
.AsParallel()
.WithDegreeOfParallelism(DegreeOfParallelism)
.ForAll(ProcessFile);
UpdateSourceFiles(fileBatch);
}
}
private ICollection<SourceFile> GetSourceFileBatch(int batchSize, Status status)
=> SourceFiles
.Where(sf => sf.Status == status)
.Take(batchSize)
.ToList();
//set status to in progress for all files in the batch
//and save the changes to database
//in the application this is actually done with a bulk update and the method is async
private void SetInProgressStatusForBatch(IEnumerable<SourceFile> fileBatch)
{
foreach (var file in fileBatch)
{
file.Status = Status.InProgress;
var sourceFile = SourceFiles.First(sf => sf.Id == file.Id);
sourceFile.Status = file.Status;
}
}
//set status and exception messages for all files in the batch
//and save the changes to database
//in the application this is actually done with a bulk update and the method is async
private void UpdateSourceFiles(IEnumerable<SourceFile> fileBatch)
{
foreach (var file in fileBatch)
{
var sourceFile = SourceFiles.First(sf => sf.Id == file.Id);
sourceFile.Status = file.Status;
sourceFile.ExceptionMessage = file.ExceptionMessage;
}
}
private void ProcessFile(SourceFile file)
{
try
{
//do something ...
Thread.Sleep(20);
file.Status = Status.Success;
file.ExceptionMessage = null;
}
catch (Exception ex)
{
file.Status = Status.Failed;
file.ExceptionMessage = ex.Message;
}
}
}
public class SourceFile
{
public int Id { get; set; }
public string Path { get; set; }
public Status Status { get; set; }
public string ExceptionMessage { get; set; }
}
public enum Status
{
Pending,
InProgress,
Success,
Failed,
}
}
【问题讨论】:
-
1) 如果你有一个文件名来源,那么你可以有 n 个工作人员,每个工作人员从来源中获取一个文件名,处理它,然后重复直到没有更多文件名。 2) 根据处理文件所需的内容,您可能会通过调整所使用的任何磁盘 I/O 缓冲区的大小来获得更显着的改进。 3) 确保一次只处理一个文件时它不会以最快的速度运行。
-
只需写一些关于你在做什么的 sudo “词”,就像一个流程.. step1.. step 2...很难判断你是否正在与 ProcessFile 中的数据库进行交互,因此很难说出瓶颈在哪里,就像很难说出入口点
-
@Seabizkit 我没有与 ProcessFile 中的数据库进行交互。
-
@MariusStănescu - 您可以将
SelectMany更改为Select+Merge以便能够指定并行度。
标签: c# performance system.reactive tpl-dataflow blockingcollection