【发布时间】:2021-07-30 21:47:17
【问题描述】:
我遇到了一个问题,我必须处理大量大型 jsonl 文件(读取、反序列化、进行一些转换、数据库查找等,然后将转换后的结果写入 .net 核心控制台应用程序。
通过将输出分批放在单独的线程上,我获得了更好的吞吐量,并试图通过添加一些并行性来改善处理方面,但开销最终是自我挫败。
我一直在做:
using (var stream = new FileStream(_filePath, FileMode.Open))
using (var reader = new StreamReader(stream)
{
for (;;)
{
var l = reader.ReadLine();
if (l == null)
break;
// Deserialize
// Do some database lookups
// Do some transforms
// Pass result to output thread
}
}
并且一些诊断时间显示ReadLine() 调用所花费的时间超过了反序列化等。为了在上面加上一些数字,一个大文件大约有:
- 在 ReadLine 上花费了 11 秒
- 序列化花费 7.8 秒
- 在数据库查找上花费了 10 秒
我想将 11 秒的文件 i/o 与其他工作重叠,所以我尝试了
using (var stream = new FileStream(_filePath, FileMode.Open))
using (var reader = new StreamReader(stream)
{
var nextLine = reader.ReadLineAsync();
for (;;)
{
var l = nextLine.Result;
if (l == null)
break;
nextLine = reader.ReadLineAsync();
// Deserialize
// Do some database lookups
// Do some transforms
// Pass result to output thread
}
}
在我进行转换的同时进行下一个 I/O。只是最终花费的时间比常规同步的时间长得多(比如两倍)。
我的要求是他们希望对整体结果具有可预测性(即,必须按名称顺序处理同一组文件,并且必须按相同顺序可预测输出行)所以我不能只是抛出每个线程一个文件,然后让他们解决。
我只是试图引入足够的并行性来平滑大量输入的吞吐量,我很惊讶上述结果会适得其反。
我错过了什么吗?
【问题讨论】:
-
目前没有时间写完整的答案,但您需要使用两个不同的线程/循环和一个共享队列。基本上在一个循环中逐行读取到线程安全队列中,而另一个循环查找并处理结果。视频播放引擎执行此操作 - 在一个线程中从磁盘读取数据包,在另一个线程中解码,然后在第三个线程中呈现。这可能有点令人生畏,但我认为这是获得所需内容的唯一方法。哦,
var l = nextLine.Result;也阻塞了线程,所以这样做你真的没有得到并行性的好处。 -
要扩展 @PeterMoore 提到的内容,您可以启动一个任务 (
Task.Run) 来读取文件,将每一行放入共享的ConcurrentQueue<string>,启动另一个任务并在排队 -
谢谢大家...这正是我在输出切换中使用的方法。
标签: c# asynchronous parallel-processing