https://docs.microsoft.com/en-us/dotnet/standard/io/pipelines
该库的目标为适用于所有 .NET 实现的 .NET Standard。
System.IO.Pipelines 解决什么问题
样板代码和特殊情况代码很复杂且难以进行维护。
System.IO.Pipelines 已构建为:
- 具有高性能的流数据分析功能。
- 减少代码复杂性。
下面的代码是典型的 TCP 服务器,它从客户机接收行分隔的消息(由 '\n' 分隔):
async Task ProcessLinesAsync(NetworkStream stream) { var buffer = new byte[1024]; await stream.ReadAsync(buffer, 0, buffer.Length); // Process a single line from the buffer ProcessLine(buffer); }
前面的代码有几个问题:
- 单次调用
ReadAsync可能无法接收整条消息(行尾)。 stream.ReadAsync返回读取的数据量。- 它不能处理在单个
ReadAsync调用中读取多行的情况。 - 它为每次读取分配一个
byte数组。
要解决上述问题,需要进行以下更改:
-
缓冲传入的数据,直到找到新行。
-
分析缓冲区中返回的所有行。
-
此代码需要调整输入缓冲区的大小,直到找到分隔符后,才能在缓冲区内容纳完整行。
- 如果调整缓冲区的大小,当输入中出现较长的行时,将生成更多缓冲区副本。
- 压缩用于读取行的缓冲区,以减少空余。
-
请考虑使用缓冲池来避免重复分配内存。
-
下面的代码解决了其中一些问题:
async Task ProcessLinesAsync(NetworkStream stream) { byte[] buffer = ArrayPool<byte>.Shared.Rent(1024); var bytesBuffered = 0; var bytesConsumed = 0; while (true) { // Calculate the amount of bytes remaining in the buffer. var bytesRemaining = buffer.Length - bytesBuffered; if (bytesRemaining == 0) { // Double the buffer size and copy the previously buffered data into the new buffer. var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2); Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length); // Return the old buffer to the pool. ArrayPool<byte>.Shared.Return(buffer); buffer = newBuffer; bytesRemaining = buffer.Length - bytesBuffered; } var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining); if (bytesRead == 0) { // EOF break; } // Keep track of the amount of buffered bytes. bytesBuffered += bytesRead; var linePosition = -1; do { // Look for a EOL in the buffered data. linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed); if (linePosition >= 0) { // Calculate the length of the line based on the offset. var lineLength = linePosition - bytesConsumed; // Process the line. ProcessLine(buffer, bytesConsumed, lineLength); // Move the bytesConsumed to skip past the line consumed (including \n). bytesConsumed += lineLength + 1; } } while (linePosition >= 0); } }
System.IO.Pipelines 的设计目的是使编写此类代码更容易。
此 GitHub 讨论问题中告诉我们。
管道
写入 PipeWriter 的所有数据都可用于 PipeReader:
1 var pipe = new Pipe(); 2 PipeReader reader = pipe.Reader; 3 PipeWriter writer = pipe.Writer;
管道基本用法
1 async Task ProcessLinesAsync(Socket socket) 2 { 3 var pipe = new Pipe(); 4 Task writing = FillPipeAsync(socket, pipe.Writer); 5 Task reading = ReadPipeAsync(pipe.Reader); 6 7 await Task.WhenAll(reading, writing); 8 } 9 10 async Task FillPipeAsync(Socket socket, PipeWriter writer) 11 { 12 const int minimumBufferSize = 512; 13 14 while (true) 15 { 16 // Allocate at least 512 bytes from the PipeWriter. 17 Memory<byte> memory = writer.GetMemory(minimumBufferSize); 18 try 19 { 20 int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None); 21 if (bytesRead == 0) 22 { 23 break; 24 } 25 // Tell the PipeWriter how much was read from the Socket. 26 writer.Advance(bytesRead); 27 } 28 catch (Exception ex) 29 { 30 LogError(ex); 31 break; 32 } 33 34 // Make the data available to the PipeReader. 35 FlushResult result = await writer.FlushAsync(); 36 37 if (result.IsCompleted) 38 { 39 break; 40 } 41 } 42 43 // By completing PipeWriter, tell the PipeReader that there's no more data coming. 44 await writer.CompleteAsync(); 45 } 46 47 async Task ReadPipeAsync(PipeReader reader) 48 { 49 while (true) 50 { 51 ReadResult result = await reader.ReadAsync(); 52 ReadOnlySequence<byte> buffer = result.Buffer; 53 54 while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line)) 55 { 56 // Process the line. 57 ProcessLine(line); 58 } 59 60 // Tell the PipeReader how much of the buffer has been consumed. 61 reader.AdvanceTo(buffer.Start, buffer.End); 62 63 // Stop reading if there's no more data coming. 64 if (result.IsCompleted) 65 { 66 break; 67 } 68 } 69 70 // Mark the PipeReader as complete. 71 await reader.CompleteAsync(); 72 } 73 74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line) 75 { 76 // Look for a EOL in the buffer. 77 SequencePosition? position = buffer.PositionOf((byte)'\n'); 78 79 if (position == null) 80 { 81 line = default; 82 return false; 83 } 84 85 // Skip the line + the \n. 86 line = buffer.Slice(0, position.Value); 87 buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); 88 return true; 89 }
有两个循环:
FillPipeAsync从Socket读取并写入PipeWriter。ReadPipeAsync从PipeReader读取并分析传入的行。
委派缓冲区管理使使用代码更容易集中关注业务逻辑。
在第一个循环中:
- PipeWriter.GetMemory(Int32) 从基础编写器获取内存。
- PipeWriter.Advance(Int32) 以告知
PipeWriter有多少数据已写入缓冲区。 - PipeWriter.FlushAsync 以使数据可用于
PipeReader。
对 PipeReader.ReadAsync 的调用:
-
ReadResult:
- 以
ReadOnlySequence<byte>形式读取的数据。 - 布尔值
IsCompleted,指示是否已到达数据结尾 (EOF)。
- 以
找到行尾 (EOL) 分隔符并分析该行后:
- 该逻辑处理缓冲区以跳过已处理的内容。
- 调用
PipeReader.AdvanceTo以告知PipeReader已消耗和检查了多少数据。
Complete 使基础管道释放其分配的内存。
反压和流量控制
理想情况下,读取和分析可协同工作:
- 写入线程使用来自网络的数据并将其放入缓冲区。
- 分析线程负责构造适当的数据结构。
通常,分析所花费的时间比仅从网络复制数据块所用时间更长:
- 读取线程领先于分析线程。
- 读取线程必须减缓或分配更多内存来存储用于分析线程的数据。
为了获得最佳性能,需要在频繁暂停和分配更多内存之间取得平衡。
为解决上述问题,Pipe 提供了两个设置来控制数据流:
- FlushAsync 暂停之前应缓冲多少数据。
- ResumeWriterThreshold:确定在恢复对
PipeWriter.FlushAsync的调用之前,读取器必须观察多少数据。
PipeWriter.FlushAsync:
- 当
Pipe中的数据量超过PauseWriterThreshold时,返回不完整的ValueTask<FlushResult>。 - 低于
ResumeWriterThreshold时,返回完整的ValueTask<FlushResult>。
使用两个值可防止快速循环,如果只使用一个值,则可能发生这种循环。
示例
1 // The Pipe will start returning incomplete tasks from FlushAsync until 2 // the reader examines at least 5 bytes. 3 var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5); 4 var pipe = new Pipe(options);
PipeScheduler
SynchronizationContext 上恢复。
默认情况下:
- SynchronizationContext。
- 如果没有
SynchronizationContext,它将使用线程池运行回调。
1 public static void Main(string[] args) 2 { 3 var writeScheduler = new SingleThreadPipeScheduler(); 4 var readScheduler = new SingleThreadPipeScheduler(); 5 6 // Tell the Pipe what schedulers to use and disable the SynchronizationContext. 7 var options = new PipeOptions(readerScheduler: readScheduler, 8 writerScheduler: writeScheduler, 9 useSynchronizationContext: false); 10 var pipe = new Pipe(options); 11 } 12 13 // This is a sample scheduler that async callbacks on a single dedicated thread. 14 public class SingleThreadPipeScheduler : PipeScheduler 15 { 16 private readonly BlockingCollection<(Action<object> Action, object State)> _queue = 17 new BlockingCollection<(Action<object> Action, object State)>(); 18 private readonly Thread _thread; 19 20 public SingleThreadPipeScheduler() 21 { 22 _thread = new Thread(DoWork); 23 _thread.Start(); 24 } 25 26 private void DoWork() 27 { 28 foreach (var item in _queue.GetConsumingEnumerable()) 29 { 30 item.Action(item.State); 31 } 32 } 33 34 public override void Schedule(Action<object> action, object state) 35 { 36 _queue.Add((action, state)); 37 } 38 }
PipeScheduler.Inline 可能会导致意外后果,如死锁。
管道重置
Reset。
PipeReader
调用 PipeReader.AdvanceTo 后,不能使用 ReadOnlySequence<byte>。
SequencePosition 参数:
- 第一个参数确定消耗的内存量。
- 第二个参数确定观察到的缓冲区数。
任何其他值都将使对 PipeReader.ReadAsync 的下一次调用立即返回并包含已观察到的和未观察到的数据,但不是已被使用的数据 。
读取流数据方案
尝试读取流数据时会出现以下几种典型模式:
- 给定数据流时,分析单条消息。
- 给定数据流时,分析所有可用消息。
TryParseMessage 不是 .NET 的一部分,它是在以下部分中使用的用户编写的方法。
1 bool TryParseMessage(ref ReadOnlySequence<byte> buffer, out Message message);
读取单条消息
下面的代码从 PipeReader 读取一条消息并将其返回给调用方。
1 async ValueTask<Message> ReadSingleMessageAsync(PipeReader reader, 2 CancellationToken cancellationToken = default) 3 { 4 while (true) 5 { 6 ReadResult result = await reader.ReadAsync(cancellationToken); 7 ReadOnlySequence<byte> buffer = result.Buffer; 8 9 // In the event that no message is parsed successfully, mark consumed 10 // as nothing and examined as the entire buffer. 11 SequencePosition consumed = buffer.Start; 12 SequencePosition examined = buffer.End; 13 14 try 15 { 16 if (TryParseMessage(ref buffer, out Message message)) 17 { 18 // A single message was successfully parsed so mark the start as the 19 // parsed buffer as consumed. TryParseMessage trims the buffer to 20 // point to the data after the message was parsed. 21 consumed = buffer.Start; 22 23 // Examined is marked the same as consumed here, so the next call 24 // to ReadSingleMessageAsync will process the next message if there's 25 // one. 26 examined = consumed; 27 28 return message; 29 } 30 31 // There's no more data to be processed. 32 if (result.IsCompleted) 33 { 34 if (buffer.Length > 0) 35 { 36 // The message is incomplete and there's no more data to process. 37 throw new InvalidDataException("Incomplete message."); 38 } 39 40 break; 41 } 42 } 43 finally 44 { 45 reader.AdvanceTo(consumed, examined); 46 } 47 } 48 49 return null; 50 }
前面的代码:
- 分析单条消息。
- 更新已使用的
SequencePosition并检查SequencePosition以指向已剪裁的输入缓冲区的开始。
通常,分析来自缓冲区的单条消息时,检查的位置应为以下位置之一:
- 消息的结尾。
- 如果未找到消息,则返回接收缓冲区的结尾。
PipeReader 常见问题部分。
读取多条消息
以下代码从 PipeReader 读取所有消息,并在每条消息上调用 ProcessMessageAsync。
1 async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default) 2 { 3 try 4 { 5 while (true) 6 { 7 ReadResult result = await reader.ReadAsync(cancellationToken); 8 ReadOnlySequence<byte> buffer = result.Buffer; 9 10 try 11 { 12 // Process all messages from the buffer, modifying the input buffer on each 13 // iteration. 14 while (TryParseMessage(ref buffer, out Message message)) 15 { 16 await ProcessMessageAsync(message); 17 } 18 19 // There's no more data to be processed. 20 if (result.IsCompleted) 21 { 22 if (buffer.Length > 0) 23 { 24 // The message is incomplete and there's no more data to process. 25 throw new InvalidDataException("Incomplete message."); 26 } 27 break; 28 } 29 } 30 finally 31 { 32 // Since all messages in the buffer are being processed, you can use the 33 // remaining buffer's Start and End position to determine consumed and examined. 34 reader.AdvanceTo(buffer.Start, buffer.End); 35 } 36 } 37 } 38 finally 39 { 40 await reader.CompleteAsync(); 41 } 42 }
取消
PipeReader.ReadAsync:
- CancellationToken。
- OperationCanceledException。
- 这对于以非破坏性和非异常的方式停止现有的读取循环非常有用。
1 private PipeReader reader; 2 3 public MyConnection(PipeReader reader) 4 { 5 this.reader = reader; 6 } 7 8 public void Abort() 9 { 10 // Cancel the pending read so the process loop ends without an exception. 11 reader.CancelPendingRead(); 12 } 13 14 public async Task ProcessMessagesAsync() 15 { 16 try 17 { 18 while (true) 19 { 20 ReadResult result = await reader.ReadAsync(); 21 ReadOnlySequence<byte> buffer = result.Buffer; 22 23 try 24 { 25 if (result.IsCanceled) 26 { 27 // The read was canceled. You can quit without reading the existing data. 28 break; 29 } 30 31 // Process all messages from the buffer, modifying the input buffer on each 32 // iteration. 33 while (TryParseMessage(ref buffer, out Message message)) 34 { 35 await ProcessMessageAsync(message); 36 } 37 38 // There's no more data to be processed. 39 if (result.IsCompleted) 40 { 41 break; 42 } 43 } 44 finally 45 { 46 // Since all messages in the buffer are being processed, you can use the 47 // remaining buffer's Start and End position to determine consumed and examined. 48 reader.AdvanceTo(buffer.Start, buffer.End); 49 } 50 } 51 } 52 finally 53 { 54 await reader.CompleteAsync(); 55 } 56 }
PipeReader 常见问题
-
将错误的值传递给
consumed或examined可能会导致读取已读取的数据。 -
传递
buffer.End作为检查对象可能会导致以下问题:- 数据停止
- 例如,当一次处理来自缓冲区的单条消息时,可能会出现
PipeReader.AdvanceTo(position, buffer.End)。
-
例如,如果
buffer.Start没有更改,则PipeReader.AdvanceTo(buffer.Start)将导致在下一个对PipeReader.ReadAsync的调用在新数据到来之前立即返回。 -
将错误的值传递给
consumed或examined可能会导致无限缓冲(最终导致 OOM)。 -
在调用
PipeReader.AdvanceTo之后使用ReadOnlySequence<byte>可能会导致内存损坏(在释放之后使用)。 -
未能调用
PipeReader.Complete/CompleteAsync可能会导致内存泄漏。 -
如果错误执行此操作,可能会导致无限循环。
有问题的代码
❌ 数据丢失
在退出读循环之前不读取该数据将导致数据丢失。
警告
PipeReader 常见问题。
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> dataLossBuffer = result.Buffer; 6 7 if (result.IsCompleted) 8 { 9 break; 10 } 11 12 Process(ref dataLossBuffer, out Message message); 13 14 reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End); 15 }
警告
PipeReader 常见问题。
❌ 无限循环
如果 Result.IsCompleted 是 true,则以下逻辑可能会导致无限循环,但缓冲区中永远不会有完整的消息。
警告
PipeReader 常见问题。
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer; 6 if (result.IsCompleted && infiniteLoopBuffer.IsEmpty) 7 { 8 break; 9 } 10 11 Process(ref infiniteLoopBuffer, out Message message); 12 13 reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End); 14 }
由于该代码位于 else if 中,如果缓冲区中没有完整的消息,它将永远循环。
警告
PipeReader 常见问题。
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer; 6 7 if (!infiniteLoopBuffer.IsEmpty) 8 { 9 Process(ref infiniteLoopBuffer, out Message message); 10 } 11 else if (result.IsCompleted) 12 { 13 break; 14 } 15 16 reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End); 17 }
❌ 意外挂起
对 PipeReader.AdvanceTo 的下次调用将在以下情况下返回:
- 有更多数据写入管道。
- 以及之前未检查过新数据。
警告
PipeReader 常见问题。
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> hangBuffer = result.Buffer; 6 7 Process(ref hangBuffer, out Message message); 8 9 if (result.IsCompleted) 10 { 11 break; 12 } 13 14 reader.AdvanceTo(hangBuffer.Start, hangBuffer.End); 15 16 if (message != null) 17 { 18 return message; 19 } 20 }
❌ 内存不足 (OOM)
OutOfMemoryException:
- 没有最大消息大小。
- 例如,它不会生成完整的消息,因为另一端正在编写一条大消息(例如,一条为 4GB 的消息)。
警告
PipeReader 常见问题。
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer; 6 7 Process(ref thisCouldOutOfMemory, out Message message); 8 9 if (result.IsCompleted) 10 { 11 break; 12 } 13 14 reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End); 15 16 if (message != null) 17 { 18 return message; 19 } 20 }
❌ 内存损坏
下面的示例将返回 Pipe 已丢弃的内存,并可能将其重新用于下一个操作(读/写)。
警告
PipeReader 常见问题。
1 public class Message 2 { 3 public ReadOnlySequence<byte> CorruptedPayload { get; set; } 4 }
1 Environment.FailFast("This code is terrible, don't use it!"); 2 Message message = null; 3 4 while (true) 5 { 6 ReadResult result = await reader.ReadAsync(cancellationToken); 7 ReadOnlySequence<byte> buffer = result.Buffer; 8 9 ReadHeader(ref buffer, out int length); 10 11 if (length <= buffer.Length) 12 { 13 message = new Message 14 { 15 // Slice the payload from the existing buffer 16 CorruptedPayload = buffer.Slice(0, length) 17 }; 18 19 buffer = buffer.Slice(length); 20 } 21 22 if (result.IsCompleted) 23 { 24 break; 25 } 26 27 reader.AdvanceTo(buffer.Start, buffer.End); 28 29 if (message != null) 30 { 31 // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer 32 // was captured. 33 break; 34 } 35 } 36 37 return message; 38 }
PipeWriter
IBufferWriter<byte> 使得无需额外的缓冲区副本就可以访问缓冲区来执行写入操作。
1 async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default) 2 { 3 // Request at least 5 bytes from the PipeWriter. 4 Memory<byte> memory = writer.GetMemory(5); 5 6 // Write directly into the buffer. 7 int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span); 8 9 // Tell the writer how many bytes were written. 10 writer.Advance(written); 11 12 await writer.FlushAsync(cancellationToken); 13 }
之前的代码:
- GetMemory 从
PipeWriter请求至少 5 个字节的缓冲区。 - 的字节写入返回的
Memory<byte>。 - Advance 以指示写入缓冲区的字节数。
- 刷新
PipeWriter,以便将字节发送到基础设备。
PipeWriter.WriteAsync:
- 将现有缓冲区复制到
PipeWriter。 - FlushAsync。
1 async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default) 2 { 3 byte[] helloBytes = Encoding.ASCII.GetBytes("Hello"); 4 5 // Write helloBytes to the writer, there's no need to call Advance here 6 // (Write does that). 7 await writer.WriteAsync(helloBytes, cancellationToken); 8 }
取消
这对于以非破坏性和非异常的方式停止暂停刷新非常有用。
PipeWriter 常见问题
- 请勿假设确切的缓冲区大小 。
- 无法保证连续的调用将返回相同的缓冲区或相同大小的缓冲区。
- 不能写入先前获得的缓冲区。
- 如果未完成对
FlushAsync的调用,则调用GetMemory或GetSpan将不安全。 - 如果未刷新数据,则调用
Complete或CompleteAsync可能导致内存损坏。
IDuplexPipe
例如,网络连接将由 IDuplexPipe 表示。
这意味着写入 PipeWriter 的内容不会从 PipeReader 中读取。
流
AsStream 返回围绕 PipeReader 或 PipeWriter 的 Stream 实现。
转载请标明本文来源:https://www.cnblogs.com/yswenli/p/11810317.html
更多内容欢迎Star、Fork我的的github:https://github.com/yswenli/
如果发现本文有什么问题和任何建议,也随时欢迎交流~