【问题标题】:How to optimize reading a list of files and storing them in a database?如何优化读取文件列表并将它们存储在数据库中?
【发布时间】:2021-10-03 23:51:38
【问题描述】:

我最近在一次采访中被问到一个问题,这真的让我思考。

我正在努力了解和了解有关多线程、并行性和并发性以及性能的更多信息。

场景是您有一个文件路径列表。文件保存在您的 HDD 或 Blob 存储上。 您已阅读文件并将它们存储在数据库中。你会如何以最优化的方式做到这一点?

以下是我能想到的一些方法:

最简单的方法是遍历列表并按顺序执行此任务。

Foreach(var filePath in filePaths)
{
  ProcessFile(filePath);
}

public void ProcessFile(string filePath)
{
  var file = readFile(filePath);
  storeInDb(file);
}

我能想到的第二种方法可能是创建多个线程:

Foreach(var filePath in filePaths)
{
Thread t  = new Thread(ProcessFIle(filePath));
t.Start();
}

(not sure if the above code is correct.)

第三种方式是使用异步等待

List<Tasks> listOfTasks;
Foreach(var filePath in filePaths)
{
  var task = ProcessFile(filePath);
  listOfTasks.Add(task);
}
Task.WhenAll(listOftasks);

public async void ProcessFile(string filePath)
{
  var file = readFile(filePath);
  storeInDb(file);
}

第四种方式是并行。对于:

Parallel.For(0,filePaths.Count , new ParallelOptions { MaxDegreeOfParallelism = 10 }, i =>
    {
        ProcessFile(filePaths[i]);
    });

它们之间有什么区别。哪一个更适合这份工作,还有什么更好的吗?

【问题讨论】:

  • 您只是将文件本身移动到数据库中,还是解析文件的内容,例如 .csv 并将内容发送到数据库。
  • @DekuDesu 是的,您正在解析文件的内容。
  • 你可以看看这个问题:Parallel.ForEach vs Task.Run and Task.WhenAll。它可能会直接回答您的问题。顺便说一句,您在问题中提到的选项都不是最佳选项。仅使用数据并行性不会获得最佳性能。您还需要任务并行性。有一个例子here
  • 您的第三种方式,“异步等待”方法的编码非常糟糕。它实际上并没有使用await,而async 应该是async void,而它应该是async Task。最好运行自己的 Task.Run 调用,以确保它被推送到后台任务。

标签: c# .net multithreading asp.net-core .net-core


【解决方案1】:

您还可以使用 Microsoft 的反应式框架(又名 Rx)- NuGet System.Reactive 并添加 using System.Reactive.Linq; - 然后您可以这样做:

IObservable<string> query =
    from filePath in filePaths.ToObservable()
    from file in Observable.Start(() => ReadFile(filePath))
    from db in Observable.Start(() => StoreInDb(file))
    select filePath;

IDisposable subscription =
    query
        .Subscribe(
            filePath => Console.WriteLine($"{filePath} Processed."),
            () => Console.WriteLine("Done."));

【讨论】:

  • 问题更多的是性能优化。
  • @SamuraiJack - 这确实可以处理。这一切都是用多个线程完成的,它会自动平衡线程池的使用。
【解决方案2】:

我写了一个简单的扩展方法来帮助启动异步任务,限制并发量,并等待它们全部完成;

public static async Task WhenAll(this IEnumerable<Task> tasks, int batchSize)
{
    var started = new List<Task>();

    foreach(var t in tasks)
    {
        started.Add(t);
        if (started.Count >= batchSize)
        {
            var ended = await Task.WhenAny(started);
            started.Remove(ended);
        }
    }
    await Task.WhenAll(started);
}

然后您需要一种将文件内容直接流式传输到数据库的方法。例如;

async Task Process(string filename){
    using var stream = File.OpenRead(filename)

    // TODO connect to the database
    var sqlCommand = ...;
    sqlCommand.CommandText = "update [table] set [column] = @stream";
    sqlCommand.Parameters.Add(new SqlParameter("@stream", SqlDbType.VarBinary)
    {
        Value = stream
    });
    await sqlCommand.ExecuteNonQueryAsync();
}
IEnumerable<string> files = ...;
await files.Select(f => Process(f)).WhenAll(20);

这是最好的方法吗?可能不是。因为这个扩展太容易被滥用了。多次意外启动任务,或一次启动所有任务。

【讨论】:

  • 为此,IEnumerable&lt;Task&gt; tasks 不应是物化集合。您可以添加一些参数验证代码,例如if (tasks is ICollection&lt;Task&gt;) throw...,但总体而言,由于您提到的原因,这不是一个好的解决方案。最后的Task.WhenAll还有一个bug,只会等待最后的batchSize任务,之前的任务抛出的任何异常都会被吞掉。一般来说,用于限制的正确工具是SemaphoreSlim,而不是Task.WhenAny
猜你喜欢
  • 1970-01-01
  • 2015-09-10
  • 1970-01-01
  • 2017-04-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-06-09
相关资源
最近更新 更多