https://docs.microsoft.com/en-us/dotnet/standard/io/pipelines

该库的目标为适用于所有 .NET 实现的 .NET Standard。

System.IO.Pipelines 解决什么问题

样板代码和特殊情况代码很复杂且难以进行维护。

System.IO.Pipelines 已构建为:

  • 具有高性能的流数据分析功能。
  • 减少代码复杂性。

下面的代码是典型的 TCP 服务器,它从客户机接收行分隔的消息(由 '\n' 分隔):

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

前面的代码有几个问题:

  • 单次调用 ReadAsync 可能无法接收整条消息(行尾)。
  • stream.ReadAsync 返回读取的数据量。
  • 它不能处理在单个 ReadAsync 调用中读取多行的情况。
  • 它为每次读取分配一个 byte 数组。

要解决上述问题,需要进行以下更改:

  • 缓冲传入的数据,直到找到新行。

  • 分析缓冲区中返回的所有行。

  • 此代码需要调整输入缓冲区的大小,直到找到分隔符后,才能在缓冲区内容纳完整行。

    • 如果调整缓冲区的大小,当输入中出现较长的行时,将生成更多缓冲区副本。
    • 压缩用于读取行的缓冲区,以减少空余。
  • 请考虑使用缓冲池来避免重复分配内存。

  • 下面的代码解决了其中一些问题:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

System.IO.Pipelines 的设计目的是使编写此类代码更容易。

此 GitHub 讨论问题中告诉我们。

管道

写入 PipeWriter 的所有数据都可用于 PipeReader

1 var pipe = new Pipe();
2 PipeReader reader = pipe.Reader;
3 PipeWriter writer = pipe.Writer;

 

管道基本用法

 1 async Task ProcessLinesAsync(Socket socket)
 2 {
 3     var pipe = new Pipe();
 4     Task writing = FillPipeAsync(socket, pipe.Writer);
 5     Task reading = ReadPipeAsync(pipe.Reader);
 6 
 7     await Task.WhenAll(reading, writing);
 8 }
 9 
10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
11 {
12     const int minimumBufferSize = 512;
13 
14     while (true)
15     {
16         // Allocate at least 512 bytes from the PipeWriter.
17         Memory<byte> memory = writer.GetMemory(minimumBufferSize);
18         try
19         {
20             int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
21             if (bytesRead == 0)
22             {
23                 break;
24             }
25             // Tell the PipeWriter how much was read from the Socket.
26             writer.Advance(bytesRead);
27         }
28         catch (Exception ex)
29         {
30             LogError(ex);
31             break;
32         }
33 
34         // Make the data available to the PipeReader.
35         FlushResult result = await writer.FlushAsync();
36 
37         if (result.IsCompleted)
38         {
39             break;
40         }
41     }
42 
43      // By completing PipeWriter, tell the PipeReader that there's no more data coming.
44     await writer.CompleteAsync();
45 }
46 
47 async Task ReadPipeAsync(PipeReader reader)
48 {
49     while (true)
50     {
51         ReadResult result = await reader.ReadAsync();
52         ReadOnlySequence<byte> buffer = result.Buffer;
53 
54         while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
55         {
56             // Process the line.
57             ProcessLine(line);
58         }
59 
60         // Tell the PipeReader how much of the buffer has been consumed.
61         reader.AdvanceTo(buffer.Start, buffer.End);
62 
63         // Stop reading if there's no more data coming.
64         if (result.IsCompleted)
65         {
66             break;
67         }
68     }
69 
70     // Mark the PipeReader as complete.
71     await reader.CompleteAsync();
72 }
73 
74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
75 {
76     // Look for a EOL in the buffer.
77     SequencePosition? position = buffer.PositionOf((byte)'\n');
78 
79     if (position == null)
80     {
81         line = default;
82         return false;
83     }
84 
85     // Skip the line + the \n.
86     line = buffer.Slice(0, position.Value);
87     buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
88     return true;
89 }

有两个循环:

  • FillPipeAsync 从 Socket 读取并写入 PipeWriter
  • ReadPipeAsync 从 PipeReader 读取并分析传入的行。

委派缓冲区管理使使用代码更容易集中关注业务逻辑。

在第一个循环中:

  • PipeWriter.GetMemory(Int32) 从基础编写器获取内存。
  • PipeWriter.Advance(Int32) 以告知 PipeWriter 有多少数据已写入缓冲区。
  • PipeWriter.FlushAsync 以使数据可用于 PipeReader

对 PipeReader.ReadAsync 的调用:

  • ReadResult:

    • 以 ReadOnlySequence<byte> 形式读取的数据。
    • 布尔值 IsCompleted,指示是否已到达数据结尾 (EOF)。

找到行尾 (EOL) 分隔符并分析该行后:

  • 该逻辑处理缓冲区以跳过已处理的内容。
  • 调用 PipeReader.AdvanceTo 以告知 PipeReader 已消耗和检查了多少数据。

Complete 使基础管道释放其分配的内存。

反压和流量控制

理想情况下,读取和分析可协同工作:

  • 写入线程使用来自网络的数据并将其放入缓冲区。
  • 分析线程负责构造适当的数据结构。

通常,分析所花费的时间比仅从网络复制数据块所用时间更长:

  • 读取线程领先于分析线程。
  • 读取线程必须减缓或分配更多内存来存储用于分析线程的数据。

为了获得最佳性能,需要在频繁暂停和分配更多内存之间取得平衡。

为解决上述问题,Pipe 提供了两个设置来控制数据流:

  • FlushAsync 暂停之前应缓冲多少数据。
  • ResumeWriterThreshold:确定在恢复对 PipeWriter.FlushAsync 的调用之前,读取器必须观察多少数据。

System.IO.Pipelines——高性能IO(一)

PipeWriter.FlushAsync:

  • 当 Pipe 中的数据量超过 PauseWriterThreshold 时,返回不完整的 ValueTask<FlushResult>
  • 低于 ResumeWriterThreshold 时,返回完整的 ValueTask<FlushResult>

使用两个值可防止快速循环,如果只使用一个值,则可能发生这种循环。

示例

1 // The Pipe will start returning incomplete tasks from FlushAsync until
2 // the reader examines at least 5 bytes.
3 var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
4 var pipe = new Pipe(options);

PipeScheduler

SynchronizationContext 上恢复。

默认情况下:

  • SynchronizationContext。
  • 如果没有 SynchronizationContext,它将使用线程池运行回调。
 1 public static void Main(string[] args)
 2 {
 3     var writeScheduler = new SingleThreadPipeScheduler();
 4     var readScheduler = new SingleThreadPipeScheduler();
 5 
 6     // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
 7     var options = new PipeOptions(readerScheduler: readScheduler,
 8                                   writerScheduler: writeScheduler,
 9                                   useSynchronizationContext: false);
10     var pipe = new Pipe(options);
11 }
12 
13 // This is a sample scheduler that async callbacks on a single dedicated thread.
14 public class SingleThreadPipeScheduler : PipeScheduler
15 {
16     private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
17      new BlockingCollection<(Action<object> Action, object State)>();
18     private readonly Thread _thread;
19 
20     public SingleThreadPipeScheduler()
21     {
22         _thread = new Thread(DoWork);
23         _thread.Start();
24     }
25 
26     private void DoWork()
27     {
28         foreach (var item in _queue.GetConsumingEnumerable())
29         {
30             item.Action(item.State);
31         }
32     }
33 
34     public override void Schedule(Action<object> action, object state)
35     {
36         _queue.Add((action, state));
37     }
38 }

PipeScheduler.Inline 可能会导致意外后果,如死锁。

管道重置

Reset。

PipeReader

调用 PipeReader.AdvanceTo 后,不能使用 ReadOnlySequence<byte>

SequencePosition 参数:

  • 第一个参数确定消耗的内存量。
  • 第二个参数确定观察到的缓冲区数。

任何其他值都将使对 PipeReader.ReadAsync 的下一次调用立即返回并包含已观察到的和未观察到的数据,但不是已被使用的数据 。

读取流数据方案

尝试读取流数据时会出现以下几种典型模式:

  • 给定数据流时,分析单条消息。
  • 给定数据流时,分析所有可用消息。

TryParseMessage 不是 .NET 的一部分,它是在以下部分中使用的用户编写的方法。

1 bool TryParseMessage(ref ReadOnlySequence<byte> buffer, out Message message);

读取单条消息

下面的代码从 PipeReader 读取一条消息并将其返回给调用方。

 1 async ValueTask<Message> ReadSingleMessageAsync(PipeReader reader,
 2  CancellationToken cancellationToken = default)
 3 {
 4     while (true)
 5     {
 6         ReadResult result = await reader.ReadAsync(cancellationToken);
 7         ReadOnlySequence<byte> buffer = result.Buffer;
 8 
 9         // In the event that no message is parsed successfully, mark consumed
10         // as nothing and examined as the entire buffer.
11         SequencePosition consumed = buffer.Start;
12         SequencePosition examined = buffer.End;
13 
14         try
15         {
16             if (TryParseMessage(ref buffer, out Message message))
17             {
18                 // A single message was successfully parsed so mark the start as the
19                 // parsed buffer as consumed. TryParseMessage trims the buffer to
20                 // point to the data after the message was parsed.
21                 consumed = buffer.Start;
22 
23                 // Examined is marked the same as consumed here, so the next call
24                 // to ReadSingleMessageAsync will process the next message if there's
25                 // one.
26                 examined = consumed;
27 
28                 return message;
29             }
30 
31             // There's no more data to be processed.
32             if (result.IsCompleted)
33             {
34                 if (buffer.Length > 0)
35                 {
36                     // The message is incomplete and there's no more data to process.
37                     throw new InvalidDataException("Incomplete message.");
38                 }
39 
40                 break;
41             }
42         }
43         finally
44         {
45             reader.AdvanceTo(consumed, examined);
46         }
47     }
48 
49     return null;
50 }

前面的代码:

  • 分析单条消息。
  • 更新已使用的 SequencePosition 并检查 SequencePosition 以指向已剪裁的输入缓冲区的开始。

通常,分析来自缓冲区的单条消息时,检查的位置应为以下位置之一:

  • 消息的结尾。
  • 如果未找到消息,则返回接收缓冲区的结尾。

PipeReader 常见问题部分。

读取多条消息

以下代码从 PipeReader 读取所有消息,并在每条消息上调用 ProcessMessageAsync

 1 async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
 2 {
 3     try
 4     {
 5         while (true)
 6         {
 7             ReadResult result = await reader.ReadAsync(cancellationToken);
 8             ReadOnlySequence<byte> buffer = result.Buffer;
 9 
10             try
11             {
12                 // Process all messages from the buffer, modifying the input buffer on each
13                 // iteration.
14                 while (TryParseMessage(ref buffer, out Message message))
15                 {
16                     await ProcessMessageAsync(message);
17                 }
18 
19                 // There's no more data to be processed.
20                 if (result.IsCompleted)
21                 {
22                     if (buffer.Length > 0)
23                     {
24                         // The message is incomplete and there's no more data to process.
25                         throw new InvalidDataException("Incomplete message.");
26                     }
27                     break;
28                 }
29             }
30             finally
31             {
32                 // Since all messages in the buffer are being processed, you can use the
33                 // remaining buffer's Start and End position to determine consumed and examined.
34                 reader.AdvanceTo(buffer.Start, buffer.End);
35             }
36         }
37     }
38     finally
39     {
40         await reader.CompleteAsync();
41     }
42 }

取消

PipeReader.ReadAsync

  • CancellationToken。
  • OperationCanceledException。
  • 这对于以非破坏性和非异常的方式停止现有的读取循环非常有用。
 1 private PipeReader reader;
 2 
 3 public MyConnection(PipeReader reader)
 4 {
 5     this.reader = reader;
 6 }
 7 
 8 public void Abort()
 9 {
10     // Cancel the pending read so the process loop ends without an exception.
11     reader.CancelPendingRead();
12 }
13 
14 public async Task ProcessMessagesAsync()
15 {
16     try
17     {
18         while (true)
19         {
20             ReadResult result = await reader.ReadAsync();
21             ReadOnlySequence<byte> buffer = result.Buffer;
22 
23             try
24             {
25                 if (result.IsCanceled)
26                 {
27                     // The read was canceled. You can quit without reading the existing data.
28                     break;
29                 }
30 
31                 // Process all messages from the buffer, modifying the input buffer on each
32                 // iteration.
33                 while (TryParseMessage(ref buffer, out Message message))
34                 {
35                     await ProcessMessageAsync(message);
36                 }
37 
38                 // There's no more data to be processed.
39                 if (result.IsCompleted)
40                 {
41                     break;
42                 }
43             }
44             finally
45             {
46                 // Since all messages in the buffer are being processed, you can use the
47                 // remaining buffer's Start and End position to determine consumed and examined.
48                 reader.AdvanceTo(buffer.Start, buffer.End);
49             }
50         }
51     }
52     finally
53     {
54         await reader.CompleteAsync();
55     }
56 }

PipeReader 常见问题

  • 将错误的值传递给 consumed 或 examined 可能会导致读取已读取的数据。

  • 传递 buffer.End 作为检查对象可能会导致以下问题:

    • 数据停止
    • 例如,当一次处理来自缓冲区的单条消息时,可能会出现 PipeReader.AdvanceTo(position, buffer.End)
  • 例如,如果 buffer.Start 没有更改,则 PipeReader.AdvanceTo(buffer.Start) 将导致在下一个对 PipeReader.ReadAsync 的调用在新数据到来之前立即返回。

  • 将错误的值传递给 consumed 或 examined 可能会导致无限缓冲(最终导致 OOM)。

  • 在调用 PipeReader.AdvanceTo 之后使用 ReadOnlySequence<byte> 可能会导致内存损坏(在释放之后使用)。

  • 未能调用 PipeReader.Complete/CompleteAsync 可能会导致内存泄漏。

  • 如果错误执行此操作,可能会导致无限循环。

有问题的代码

❌ 数据丢失

在退出读循环之前不读取该数据将导致数据丢失。

 警告

PipeReader 常见问题。

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
 6 
 7     if (result.IsCompleted)
 8     {
 9         break;
10     }
11 
12     Process(ref dataLossBuffer, out Message message);
13 
14     reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
15 }

 警告

PipeReader 常见问题。

❌ 无限循环

如果 Result.IsCompleted 是 true,则以下逻辑可能会导致无限循环,但缓冲区中永远不会有完整的消息。

 警告

PipeReader 常见问题。

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
 6     if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
 7     {
 8         break;
 9     }
10 
11     Process(ref infiniteLoopBuffer, out Message message);
12 
13     reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
14 }

由于该代码位于 else if 中,如果缓冲区中没有完整的消息,它将永远循环。

 警告

PipeReader 常见问题。

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
 6 
 7     if (!infiniteLoopBuffer.IsEmpty)
 8     {
 9         Process(ref infiniteLoopBuffer, out Message message);
10     }
11     else if (result.IsCompleted)
12     {
13         break;
14     }
15 
16     reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
17 }

❌ 意外挂起

对 PipeReader.AdvanceTo 的下次调用将在以下情况下返回:

  • 有更多数据写入管道。
  • 以及之前未检查过新数据。

 警告

PipeReader 常见问题。

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> hangBuffer = result.Buffer;
 6 
 7     Process(ref hangBuffer, out Message message);
 8 
 9     if (result.IsCompleted)
10     {
11         break;
12     }
13 
14     reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
15 
16     if (message != null)
17     {
18         return message;
19     }
20 }

❌ 内存不足 (OOM)

OutOfMemoryException:

  • 没有最大消息大小。
  • 例如,它不会生成完整的消息,因为另一端正在编写一条大消息(例如,一条为 4GB 的消息)。

 警告

PipeReader 常见问题。

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
 6 
 7     Process(ref thisCouldOutOfMemory, out Message message);
 8 
 9     if (result.IsCompleted)
10     {
11         break;
12     }
13 
14     reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
15 
16     if (message != null)
17     {
18         return message;
19     }
20 }

❌ 内存损坏

下面的示例将返回 Pipe 已丢弃的内存,并可能将其重新用于下一个操作(读/写)。

 警告

PipeReader 常见问题。

1 public class Message
2 {
3     public ReadOnlySequence<byte> CorruptedPayload { get; set; }
4 }
 1 Environment.FailFast("This code is terrible, don't use it!");
 2     Message message = null;
 3 
 4     while (true)
 5     {
 6         ReadResult result = await reader.ReadAsync(cancellationToken);
 7         ReadOnlySequence<byte> buffer = result.Buffer;
 8 
 9         ReadHeader(ref buffer, out int length);
10 
11         if (length <= buffer.Length)
12         {
13             message = new Message
14             {
15                 // Slice the payload from the existing buffer
16                 CorruptedPayload = buffer.Slice(0, length)
17             };
18 
19             buffer = buffer.Slice(length);
20         }
21 
22         if (result.IsCompleted)
23         {
24             break;
25         }
26 
27         reader.AdvanceTo(buffer.Start, buffer.End);
28 
29         if (message != null)
30         {
31             // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
32             // was captured.
33             break;
34         }
35     }
36 
37     return message;
38 }

PipeWriter

IBufferWriter<byte> 使得无需额外的缓冲区副本就可以访问缓冲区来执行写入操作。

 1 async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
 2 {
 3     // Request at least 5 bytes from the PipeWriter.
 4     Memory<byte> memory = writer.GetMemory(5);
 5 
 6     // Write directly into the buffer.
 7     int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
 8 
 9     // Tell the writer how many bytes were written.
10     writer.Advance(written);
11 
12     await writer.FlushAsync(cancellationToken);
13 }

之前的代码:

  • GetMemory 从 PipeWriter 请求至少 5 个字节的缓冲区。
  •  的字节写入返回的 Memory<byte>
  • Advance 以指示写入缓冲区的字节数。
  • 刷新 PipeWriter,以便将字节发送到基础设备。

PipeWriter.WriteAsync:

  • 将现有缓冲区复制到 PipeWriter
  • FlushAsync。
1 async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
2 {
3     byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
4 
5     // Write helloBytes to the writer, there's no need to call Advance here
6     // (Write does that).
7     await writer.WriteAsync(helloBytes, cancellationToken);
8 }

取消

这对于以非破坏性和非异常的方式停止暂停刷新非常有用。

PipeWriter 常见问题

  • 请勿假设确切的缓冲区大小 。
  • 无法保证连续的调用将返回相同的缓冲区或相同大小的缓冲区。
  • 不能写入先前获得的缓冲区。
  • 如果未完成对 FlushAsync 的调用,则调用 GetMemory 或 GetSpan 将不安全。
  • 如果未刷新数据,则调用 Complete 或 CompleteAsync 可能导致内存损坏。

IDuplexPipe

例如,网络连接将由 IDuplexPipe 表示。

这意味着写入 PipeWriter 的内容不会从 PipeReader 中读取。

AsStream 返回围绕 PipeReader 或 PipeWriter 的 Stream 实现。

 

System.IO.Pipelines——高性能IO(一) 

System.IO.Pipelines——高性能IO(二)   

System.IO.Pipelines——高性能IO(三)  

 


转载请标明本文来源:https://www.cnblogs.com/yswenli/p/11810317.html
更多内容欢迎Star、Fork我的的github:https://github.com/yswenli/
如果发现本文有什么问题和任何建议,也随时欢迎交流~

 

 

相关文章:

  • 2021-09-23
  • 2021-09-01
  • 2022-01-10
猜你喜欢
  • 2021-07-08
  • 2021-07-15
  • 2022-12-23
  • 2022-12-23
  • 2021-10-08
  • 2022-12-23
相关资源
相似解决方案