该库的目标为适用于所有 .NET 实现的 .NET Standard。

System.IO.Pipelines 解决什么问题

  System.IO.Pipelines 已构建为:

  • 具有高性能的流数据分析功能。
  • 减少代码复杂性。

下面的代码是典型的 TCP 服务器,它从客户机接收行分隔的消息(由 '\n' 分隔):

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);
    
    // Process a single line from the buffer
    ProcessLine(buffer);
}

前面的代码有几个问题:

  • 单次调用 ReadAsync 可能无法接收整条消息(行尾)。
  • stream.ReadAsync 返回读取的数据量。
  • 它不能处理在单个 ReadAsync 调用中读取多行的情况。
  • 它为每次读取分配一个 byte 数组。

要解决上述问题,需要进行以下更改:

  • 缓冲传入的数据,直到找到新行。

  • 分析缓冲区中返回的所有行。

  • 找到需要调整输入缓冲区大小的代码(一行完整的代码)。

    • 如果调整缓冲区的大小,当输入中出现较长的行时,将生成更多缓冲区副本。
    • 压缩用于读取行的缓冲区,以减少空余。
  • 请考虑使用缓冲池来避免重复分配内存。

 

下面的代码解决了其中一些问题:

 

 1 async Task ProcessLinesAsync(NetworkStream stream)
 2 {
 3     byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
 4     var bytesBuffered = 0;
 5     var bytesConsumed = 0;
 6 
 7     while (true)
 8     {
 9         // Calculate the amount of bytes remaining in the buffer.
10         var bytesRemaining = buffer.Length - bytesBuffered;
11 
12         if (bytesRemaining == 0)
13         {
14             // Double the buffer size and copy the previously buffered data into the new buffer.
15             var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
16             Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
17             // Return the old buffer to the pool.
18             ArrayPool<byte>.Shared.Return(buffer);
19             buffer = newBuffer;
20             bytesRemaining = buffer.Length - bytesBuffered;
21         }
22 
23         var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
24         if (bytesRead == 0)
25         {
26             // EOF
27             break;
28         }
29 
30         // Keep track of the amount of buffered bytes.
31         bytesBuffered += bytesRead;
32         var linePosition = -1;
33 
34         do
35         {
36             // Look for a EOL in the buffered data.
37             linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
38                                          bytesBuffered - bytesConsumed);
39 
40             if (linePosition >= 0)
41             {
42                 // Calculate the length of the line based on the offset.
43                 var lineLength = linePosition - bytesConsumed;
44 
45                 // Process the line.
46                 ProcessLine(buffer, bytesConsumed, lineLength);
47 
48                 // Move the bytesConsumed to skip past the line consumed (including \n).
49                 bytesConsumed += lineLength + 1;
50             }
51         }
52         while (linePosition >= 0);
53     }
54 }
View Code

相关文章:

  • 2022-12-23
  • 2021-04-07
  • 2021-07-14
  • 2021-04-22
  • 2022-01-05
  • 2021-07-25
  • 2022-12-23
猜你喜欢
  • 2021-07-08
  • 2021-07-15
  • 2021-05-15
  • 2022-12-23
  • 2021-10-08
  • 2021-09-05
相关资源
相似解决方案