【问题标题】:Async file I/O overhead in C#C# 中的异步文件 I/O 开销
【发布时间】:2021-07-30 21:47:17
【问题描述】:

我遇到了一个问题,我必须处理大量大型 jsonl 文件(读取、反序列化、进行一些转换、数据库查找等,然后将转换后的结果写入 .net 核心控制台应用程序。

通过将输出分批放在单独的线程上,我获得了更好的吞吐量,并试图通过添加一些并行性来改善处理方面,但开销最终是自我挫败。

我一直在做:

using (var stream = new FileStream(_filePath, FileMode.Open))
using (var reader = new StreamReader(stream)
{
    for (;;)
    {
        var l = reader.ReadLine();
        if (l == null)
            break;
        // Deserialize
        // Do some database lookups
        // Do some transforms
        // Pass result to output thread
    }
}

并且一些诊断时间显示ReadLine() 调用所花费的时间超过了反序列化等。为了在上面加上一些数字,一个大文件大约有:

  • 在 ReadLine 上花费了 11 秒
  • 序列化花费 7.8 秒
  • 在数据库查找上花费了 10 秒

我想将 11 秒的文件 i/o 与其他工作重叠,所以我尝试了

using (var stream = new FileStream(_filePath, FileMode.Open))
using (var reader = new StreamReader(stream)
{
    var nextLine = reader.ReadLineAsync();
    for (;;)
    {
        var l = nextLine.Result;
        if (l == null)
            break;
        nextLine = reader.ReadLineAsync();
        // Deserialize
        // Do some database lookups
        // Do some transforms
        // Pass result to output thread
    }
}

在我进行转换的同时进行下一个 I/O。只是最终花费的时间比常规同步的时间长得多(比如两倍)。

我的要求是他们希望对整体结果具有可预测性(即,必须按名称顺序处理同一组文件,并且必须按相同顺序可预测输出行)所以我不能只是抛出每个线程一个文件,然后让他们解决。

我只是试图引入足够的并行性来平滑大量输入的吞吐量,我很惊讶上述结果会适得其反。

我错过了什么吗?

【问题讨论】:

  • 目前没有时间写完整的答案,但您需要使用两个不同的线程/循环和一个共享队列。基本上在一个循环中逐行读取到线程安全队列中,而另一个循环查找并处理结果。视频播放引擎执行此操作 - 在一个线程中从磁盘读取数据包,在另一个线程中解码,然后在第三个线程中呈现。这可能有点令人生畏,但我认为这是获得所需内容的唯一方法。哦,var l = nextLine.Result; 也阻塞了线程,所以这样做你真的没有得到并行性的好处。
  • 要扩展 @PeterMoore 提到的内容,您可以启动一个任务 (Task.Run) 来读取文件,将每一行放入共享的 ConcurrentQueue<string>,启动另一个任务并在排队
  • 谢谢大家...这正是我在输出切换中使用的方法。

标签: c# asynchronous parallel-processing


【解决方案1】:

内置异步文件系统 API are currently broken,建议您避免使用它们。它们不仅比同步对应物慢得多,而且它们甚至不是真正的异步。 .NET 6 将附带 improved FileStream 实现,因此几个月后这可能不再是问题。

您要实现的目标称为任务并行,即两个或多个异构操作同时运行且彼此独立。这是一项先进的技术,需要专门的工具。最常见的并行类型是所谓的数据并行,其中相同类型的操作在同质数据列表上并行运行,通常使用Parallel 类或 PLINQ 库来实现。

要实现任务并行,最容易获得的工具是 TPL Dataflow 库,它内置于 .NET Core / .NET 5 平台,如果您的目标是 .NET,则只需安装 a package。 NET 框架。该库允许您创建由称为“块”(TransformBlockActionBlockBatchBlock 等)的链接组件组成的管道,其中每个块充当具有自己的输入和输出队列的独立处理器。您向管道提供数据,数据在管道中从一个块到另一个块流动,同时在此过程中进行处理。 Complete 管道中的第一个块表示将不再有可用的输入数据,然后 await 最后一个块的Completion 让您的代码等待所有工作完成。这是一个例子:

private async void Button1_Click(object sender, EventArgs e)
{
    Button1.Enabled = false;
    var fileBlock = new TransformManyBlock<string, IList<string>>(filePath =>
    {
        return File.ReadLines(filePath).Buffer(10);
    });

    var deserializeBlock = new TransformBlock<IList<string>, MyObject[]>(lines =>
    {
        return lines.Select(line => Deserialize(line)).ToArray();
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 2 // Let's assume that Deserialize is parallelizable
    });

    var persistBlock = new TransformBlock<MyObject[], MyObject[]>(async objects =>
    {
        foreach (MyObject obj in objects) await PersistToDbAsync(obj);
        return objects;
    });

    var displayBlock = new ActionBlock<MyObject[]>(objects =>
    {
        foreach (MyObject obj in objects) TextBox1.AppendText($"{obj}\r\n");
    }, new ExecutionDataflowBlockOptions()
    {
        TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
        // Make sure that the delegate will be invoked on the UI thread
    });

    fileBlock.LinkTo(deserializeBlock,
        new DataflowLinkOptions { PropagateCompletion = true });
    deserializeBlock.LinkTo(persistBlock,
        new DataflowLinkOptions { PropagateCompletion = true });
    persistBlock.LinkTo(displayBlock,
        new DataflowLinkOptions { PropagateCompletion = true });

    foreach (var filePath in Directory.GetFiles(@"C:\Data"))
        await fileBlock.SendAsync(filePath);

    fileBlock.Complete();
    await displayBlock.Completion;
    MessageBox.Show("Done");
    Button1.Enabled = true;
}

通过管道传递的数据应该是大块的。如果每个工作单元都太轻量级,您应该将它们批处理成数组或列表,否则移动大量微小数据的开销将超过并行性的好处。这就是在上面的示例中使用Buffer LINQ 运算符(来自System.Interactive 包)的原因。 .NET 6 将附带一个新的Chunk LINQ 运算符,提供相同的功能。

【讨论】:

    【解决方案2】:

    Theodor 的建议看起来是一个非常强大且有用的库,值得一试,但如果您正在寻找一个较小的 DIY 解决方案,我会这样做:

    using System;
    using System.IO;
    using System.Threading.Tasks;
    using System.Collections.Generic;
    
    namespace Parallelism
    {
        class Program
        {
            private static Queue<string> _queue = new Queue<string>();
            private static Task _lastProcessTask;
            
            static async Task Main(string[] args)
            {
                string path = "???";
                await ReadAndProcessAsync(path);
            }
    
            private static async Task ReadAndProcessAsync(string path)
            {
                using (var str = File.OpenRead(path))
                using (var sr = new StreamReader(str))
                {
                    string line = null;
                    while (true)
                    {
                        line = await sr.ReadLineAsync();
                        if (line == null)
                            break;
    
                        lock (_queue)
                        {
                            _queue.Enqueue(line);
                            if (_queue.Count == 1)
                                // There was nothing in the queue before
                                // so initiate a new processing loop. Save 
                                // but DON'T await the Task yet.
                                _lastProcessTask = ProcessQueueAsync();
                        }
                    }                
                }
    
                // Now that file reading is completed, await 
                // _lastProcessTask to ensure we don't return
                // before it's finished.
                await _lastProcessTask;
            }
    
            // This will continue processing as long as lines are in the queue,
            // including new lines entering the queue while processing earlier ones.
            private static Task ProcessQueueAsync()
            {
                return Task.Run(async () =>
                {
                    while (true)
                    {
                        string line;
                        lock (_queue)
                        {              
                            // Only peak at first so the read loop doesn't think
                            // the queue is empty and initiate a second processing
                            // loop while we're processing this line.
                            if (!_queue.TryPeek(out line))
                                return;
                        }
                        await ProcessLineAsync(line);
                        lock (_queue)
                        {
                            // Dequeues the item we just processed. If it's the last
                            // one, this loop is done.
                            _queue.Dequeue();
                            if (_queue.Count == 0)
                                return;
                        }
                    }
                });
            }
    
            private static async Task ProcessLineAsync(string line)
            {
                // do something
            }
        }
    }
    

    请注意,这种方法有一个处理循环,当队列中没有任何内容时终止,并在新项目准备好时重新启动,如果需要。另一种方法是有一个连续的处理循环,在队列为空时反复重新检查并在一小段时间内执行Task.Delay()。我更喜欢我的方法,因为它不会因定期和不必要的检查而使工作线程陷入困境,但性能可能会有不明显的不同。

    也只是为了评论 Blindy 的回答,我不同意在这里不鼓励使用并行性。首先,如今大多数 CPU 都是多核的,因此巧妙地使用 .NET 线程池实际上会在多核 CPU 上运行时最大限度地提高您的应用程序的效率,并且在单核场景中的缺点非常小。

    但更重要的是,异步不等于多线程。异步编程早在多线程之前就已经存在,I/O 是最显着的例子。 I/O 操作在很大程度上由硬件处理而不是 CPU - NIC、SATA 控制器等。它们使用大多数编码人员称为硬件中断的古老概念今天可能从未听说过多线程,并且比多线程早了几十年。它基本上只是一种在非 CPU 操作完成时给 CPU 一个回调以执行的方法。因此,当您使用行为良好的异步 API 时(尽管 .NET FileStream 存在 Theodore 提到的问题),您的 CPU 根本不应该做那么多工作。而当你await 这样的 API 时,CPU 基本上处于空闲状态,直到机器中的其他硬件将请求的数据写入 RAM。

    我同意 Blindy 的观点,如果计算机科学程序能更好地教人们计算机硬件如何实际工作,那就更好了。用柯克船长的话来说,利用 CPU 可以在等待从磁盘、网络等读取数据的同时做其他事情这一事实,就是“军官思维”。

    【讨论】:

    • 天啊。这是我希望在库中看到的那种代码,而不是在应用程序代码中。它很脆弱,难以测试,并且在新需求到来时难以调整。如果处理的文件大小为数 GB,并且推送 Queue&lt;string&gt; 中的每一行导致内存不足异常,您会怎么做?或者,如果使用单个工作任务 (ProcessQueueAsync) 处理行太慢,您想再添加一两个工作人员来加快处理速度?
    • @theodorzoulias hah 我认为这是一种恭维。 ;) 但是关于队列大小,如果这是一个问题,你会在读取循环中检查你的队列大小,如果它变得太大,就会限制自己。至于第二个假设,您是对的,拥有多个并行工作者 - 特别是如果您想保持输出序列顺序 - 将超出 SO 答案的可理解范围。因此我说你的建议值得一试。
    • 我放学后的第一次专业演出是在 1980 年代的 VAX/VMS 系统上。就像你说的那样,他们有异步但只在 CPU 外的 io 操作上。
    • @user1664043 解释了很多 :) 希望我的回答能帮助您提高效率!
    【解决方案3】:

    在 ReadLine 上花费了 11 秒

    具体来说,更像是在文件 I/O 上花费了 11 秒,但您没有衡量这一点。

    用此替换您的流创建:

    using var reader = new StreamReader(_filePath, Encoding.UTF8, false, 50 * 1024 * 1024);
    

    这将使它读取到 50MB 的缓冲区(根据需要调整大小),以避免在看似古老的硬盘驱动器上重复 I/O。

    我只是想引入足够的并行性来平滑吞吐量

    您不仅根本没有引入任何并行性,而且您错误地使用了ReadLineAsync——它返回的是Task&lt;string&gt;,而不是string

    这完全是矫枉过正,增加缓冲区大小很可能会解决您的问题,但如果您想真正做到这一点,您需要两个线程通过共享数据结构进行通信,正如 Peter 所说。

    只是最终花费的时间比常规同步的东西要长得多

    让我感到困惑的是,人们认为多线程代码应该比单线程代码消耗更少的处理能力。当今的教育必须缺乏一些真正的基本理解才能导致这一点。多线程包括多个额外的上下文切换、互斥争用、您的操作系统调度程序开始替换您的一个线程(导致饥饿或过饱和)、工作完成后收集、序列化和聚合结果等。这些都不是免费的或容易的实施。

    【讨论】:

      猜你喜欢
      • 2011-11-25
      • 2012-05-16
      • 1970-01-01
      • 2013-06-19
      • 1970-01-01
      • 2011-08-05
      • 2021-01-15
      • 1970-01-01
      • 2011-03-14
      相关资源
      最近更新 更多