【问题标题】:Do I need to manually flush StreamWriter? [duplicate]我需要手动刷新 StreamWriter 吗? [复制]
【发布时间】:2021-04-15 16:27:14
【问题描述】:

我有以下场景。

我通过逐行读取巨大的 csv 文件来执行拆分功能。 每行都有categoryId。 基于该 ID,我需要将此行写入单独的文件。

为此,我正在执行以下操作:

  1. 逐行读取大文件。
  2. 阅读每一行后,我会根据 categoryId 打开一个新流(仅当流尚未打开时)。将行写入流,然后保持流打开,因为大文件中可能会有更多行。
  3. 最后,处理完大文件中的所有行后,我将关闭所有打开的流。这会强制刷新并关闭连接。

我的问题是。我是否需要手动调用 Flush() 让我们说 -> 每记录 100 行,或者这是由 StreamWriter 本身处理的东西。我在网上读到有一个缓冲区满时会自动刷新,但我不确定这是否属实。我担心的是,如果它不刷新并等待大文件结束,我可能最终会将整个文件加载到内存中。

这是代码的一部分,看看我在说什么:

try
        {
            while (!reader.EndOfStream)
            {
                var line = await reader.ReadLineAsync();
                var locationId = line.Split(',')[0];
                var gdProjectId = GetGDProjectId(locationId);

                var blobName = $"{gdProjectId}/{DateTime.UtcNow.ToString("dd-MM-yyyy")}/{DateTime.UtcNow.ToString("HH-mm-ss")}-{Guid.NewGuid()}.csv";

                if (!openWriters.ContainsKey(gdProjectId))
                {
                    var blockBlobClient = containerClient.GetBlockBlobClient(blobName);
                    var newWriteStream = await blockBlobClient.OpenWriteAsync(true);
                    openWriters.Add(gdProjectId, new StreamWriter(newWriteStream, Encoding.UTF8));
                }

                var writer = openWriters[gdProjectId];
                await writer.WriteLineAsync(line);

                // SHOULD I MANUALLY INVOKE FLUSH ON EVERY {X} lines processed ?
                // TODO: Check if we need to manually flush or the streamwriter does it for us when the buffer is full.
                // await writer.FlushAsync();
            }
        }
        catch (Exception ex)
        {

            throw;
        }
        finally
        {
            // we are always closing the writers no matter if the operation is successful or not.
            foreach (var oStream in openWriters)
            {
                oStream.Value.Close();
            }
        }

【问题讨论】:

  • 总是发布代码而不是图片。有时人们想复制一些代码并将其放在他们的答案中。
  • 在你写完之前你绝对不想调用 Flush()。这样做会导致缓冲区在充满之前被刷新,从而破坏了缓冲区的用途。在写入所有数据后显式调用 Flush() 是一种很好的做法,但如前所述,退出 using { } 块将隐式执行此操作。我更喜欢显式调用它,因为如果有异常写入底层流,诊断会更容易一些。
  • 大家好,我用代码而不是屏幕截图更新了我的示例。 @glenebob 我的问题是否以及何时由 streamWriter 调用自动刷新?我想可以说如果缓冲区是 1024 -> 填满后它会自动刷新并写入目标流,对吗?我担心的是不要在内存中加载太多数据并消耗机器的整个 RAM。你知道在自动刷新之前我可以在作者中容纳多少个字符吗?我将处于一种情况,我可以同时拥有很多打开的流,我不想消耗所有的内存。
  • “我不想消耗所有的内存” - 你无法控制它。即使您调用Flush(),即使StreamWriter.Flush() 方法显式刷新底层流,文件I/O 的层也比这多,例如操作系统缓存。更重要的是,这些缓冲区只有一些 K 大;它们太小了,不会对内存开销产生任何实质性影响,即使有,无论是否刷新缓冲区都存在。明确调用Flush() 的唯一原因是当您有一些特定 理由来确保数据已经...
  • ... 写入,例如您正在写入网络流并且不希望数据延迟,或者您正在写入日志文件并希望确保每一行已经写了以防进程崩溃,诸如此类。另请注意,@jdweng 的上述评论大多是错误的。没有计时器,并且您在关闭编写器时不需要调用Flush(),因为关闭/处置编写器将始终作为该操作的一部分自动刷新数据。

标签: c# .net stream


【解决方案1】:

Flush(在StreamWriter 实现中)只是将数据从缓冲区发送到底层流,然后在底层流上调用Flush,即(伪代码):

underlyingStream.Write(GetDataFromBuffer());
bufferPosition = 0; // "clears" buffer
underlyingStream.Flush();

缓冲区大小恒定。默认情况下,它大约是 2-4KiB。但可以在构造函数中手动设置更大的值。 Flush 不会更改缓冲区大小。因此,每 100 行调用一次Flush 将一无所获。

问:“我是否需要手动调用 Flush() 比如说 -> 每 100 行...”

没有。它不会为您节省任何内存。它只会更早地将数据写入底层流 - 即它不会等待缓冲区已满。
提示:如果属性 AutoFlush 设置为 true,Flush 将被自动调用在每个 WriteXYZ 方法调用之后。

问:“我担心的是,如果它不刷新并等待大文件结束,我最终可能会将整个文件加载到内存中。”

缓冲区大小是恒定的。拨打 Flush 也无济于事。

但是……

StreamWriter 的角度来看,一切都是真实的。
因为StreamWriter 只是引用了一些Stream 实例,所以如果不知道Stream 实例的具体实现(Streamabstract),就无法预测内存使用情况。

您应该创建新问题,例如:“我需要手动刷新 XyzStream”吗? (如果没有类似的问题已经发布)。

【讨论】:

  • 感谢您的详细解答!我正在使用 azure blob 存储流。我注意到即使在尝试将缓冲区大小更改为 4096 之后,当我检查缓冲区大小仍然是 4194304 时。我猜他们的流实现不允许我更改缓冲区。无论如何,感谢您更改我的问题的建议。我将首先尝试 google 并具体说明我正在使用 blob 存储流。如果找不到更多信息,我将在这里提出另一个问题。
  • 我在您的推荐后做了一些研究,发现 azure blob 流具有不同的最小值、最大值和缓冲区大小的默认值。默认值为 4MB,最大值为 4000MB,最小值为 1MB。这是我可以配置它的方法。 var newWriteStream = await blockBlobClient.OpenWriteAsync(true, new Azure.Storage.Blobs.Models.BlockBlobOpenWriteOptions { BufferSize = 1048576 });这回答了我的问题,为什么我以前不能强制缓冲区写入结束流 -> 因为我发送的数据少于 4MB(默认值):)
  • @nzhul:在网络流上调用 flush 通常会破坏性能。当您打开几个流时,内存使用没有问题。但正如我所见,您正在根据一些 ID 创建新的流。因此,当您一次创建 100 个流时可能会出现问题(400 MB 内存使用,不包括网络连接使用的资源)。
  • 那是我的演唱会。我最终可能会同时打开 100 多个流。我认为商业案例是这样的 -> 我们需要将一个大文件拆分为 400-500 个小文件,并且每个文件都应该包含基于我提到的这个 categoryId 的数据。
  • @nzhul:那么可能会改变策略,需要对输入文件进行 2 次遍历:1)扫描输入文件并保存以下索引列表:line_start_position 和 line_project_id。 2) 然后按 project_id 过滤列表,并开始将行保存到 azure blob,如:inputReader.SetPosition(line_start_position);字符串 csvLine = inputReader.ReadCsvLine();然后 azureStream.WriteLine(csvLine);此外,您应该使用 CsvHelper 库之类的东西,而不是使用 string.Split.
猜你喜欢
  • 1970-01-01
  • 2016-05-21
  • 2011-06-29
  • 1970-01-01
  • 2010-10-19
  • 2013-07-31
  • 1970-01-01
  • 1970-01-01
  • 2017-07-10
相关资源
最近更新 更多