该库的目标为适用于所有 .NET 实现的 .NET Standard。
System.IO.Pipelines 解决什么问题
System.IO.Pipelines 已构建为:
- 具有高性能的流数据分析功能。
- 减少代码复杂性。
下面的代码是典型的 TCP 服务器,它从客户机接收行分隔的消息(由 '\n' 分隔):
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
前面的代码有几个问题:
- 单次调用
ReadAsync可能无法接收整条消息(行尾)。 stream.ReadAsync返回读取的数据量。- 它不能处理在单个
ReadAsync调用中读取多行的情况。 - 它为每次读取分配一个
byte数组。
要解决上述问题,需要进行以下更改:
-
缓冲传入的数据,直到找到新行。
-
分析缓冲区中返回的所有行。
-
找到需要调整输入缓冲区大小的代码(一行完整的代码)。
- 如果调整缓冲区的大小,当输入中出现较长的行时,将生成更多缓冲区副本。
- 压缩用于读取行的缓冲区,以减少空余。
-
请考虑使用缓冲池来避免重复分配内存。
下面的代码解决了其中一些问题:
1 async Task ProcessLinesAsync(NetworkStream stream) 2 { 3 byte[] buffer = ArrayPool<byte>.Shared.Rent(1024); 4 var bytesBuffered = 0; 5 var bytesConsumed = 0; 6 7 while (true) 8 { 9 // Calculate the amount of bytes remaining in the buffer. 10 var bytesRemaining = buffer.Length - bytesBuffered; 11 12 if (bytesRemaining == 0) 13 { 14 // Double the buffer size and copy the previously buffered data into the new buffer. 15 var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2); 16 Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length); 17 // Return the old buffer to the pool. 18 ArrayPool<byte>.Shared.Return(buffer); 19 buffer = newBuffer; 20 bytesRemaining = buffer.Length - bytesBuffered; 21 } 22 23 var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining); 24 if (bytesRead == 0) 25 { 26 // EOF 27 break; 28 } 29 30 // Keep track of the amount of buffered bytes. 31 bytesBuffered += bytesRead; 32 var linePosition = -1; 33 34 do 35 { 36 // Look for a EOL in the buffered data. 37 linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, 38 bytesBuffered - bytesConsumed); 39 40 if (linePosition >= 0) 41 { 42 // Calculate the length of the line based on the offset. 43 var lineLength = linePosition - bytesConsumed; 44 45 // Process the line. 46 ProcessLine(buffer, bytesConsumed, lineLength); 47 48 // Move the bytesConsumed to skip past the line consumed (including \n). 49 bytesConsumed += lineLength + 1; 50 } 51 } 52 while (linePosition >= 0); 53 } 54 }