【发布时间】:2019-05-14 01:18:46
【问题描述】:
我注意到新的 System.IO.Pipelines 并尝试将现有的基于流的代码移植到它。流的问题很容易理解,但同时它具有相关类的丰富回声系统。
从这里提供的示例中,有一个小型 tcp echo 服务器。 https://blogs.msdn.microsoft.com/dotnet/2018/07/09/system-io-pipelines-high-performance-io-in-net/
这里附上一段sn-p的代码:
private static async Task ProcessLinesAsync(Socket socket)
{
Console.WriteLine($"[{socket.RemoteEndPoint}]: connected");
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(socket, pipe.Reader);
await Task.WhenAll(reading, writing);
Console.WriteLine($"[{socket.RemoteEndPoint}]: disconnected");
}
private static async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
try
{
// Request a minimum of 512 bytes from the PipeWriter
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read
writer.Advance(bytesRead);
}
catch
{
break;
}
// Make the data available to the PipeReader
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// Signal to the reader that we're done writing
writer.Complete();
}
private static async Task ReadPipeAsync(Socket socket, PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition? position = null;
do
{
// Find the EOL
position = buffer.PositionOf((byte)'\n');
if (position != null)
{
var line = buffer.Slice(0, position.Value);
ProcessLine(socket, line);
// This is equivalent to position + 1
var next = buffer.GetPosition(1, position.Value);
// Skip what we've already processed including \n
buffer = buffer.Slice(next);
}
}
while (position != null);
// We sliced the buffer until no more data could be processed
// Tell the PipeReader how much we consumed and how much we left to process
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
{
break;
}
}
reader.Complete();
}
private static void ProcessLine(Socket socket, in ReadOnlySequence<byte> buffer)
{
if (_echo)
{
Console.Write($"[{socket.RemoteEndPoint}]: ");
foreach (var segment in buffer)
{
Console.Write(Encoding.UTF8.GetString(segment.Span));
}
Console.WriteLine();
}
}
使用流时,您可以轻松地将 SSL/TLS 添加到您的代码中,只需将其包装在 SslStream 中即可。 Pipelines 打算如何解决这个问题?
【问题讨论】:
-
乍一看,Kestrel 似乎使用流(包括 SslStream)来为其管道提供数据。 Mark Gravell's blog on Pipes 中大致介绍了这一点 - 请参阅“抽水管”部分和明确提到 TLS 的“这是我之前制作的”。
-
有趣,但考虑到支持 Pipelines 的论点对 Stream 有点负面,它不是违背了它的目的吗?
-
有几点需要注意。 1. System.IO.Pipelines 只是整体开发的第一步,因此没有 API 可用作 System.IO.Pipelines 的端点(例如直接接受数据和泵的 dotnetty、Kestrel 或 .NET 套接字它在那里)。虽然它们是为未来计划的,但 .NET Core 团队首先需要围绕它设计一个干净的 api,并找到时间来实现这些,
-
2.管道用于解决接收需要以最有效和易于使用的方式解析的分块数据的特定问题,此 api 的用户不必担心如何合并连续缓冲区或将块粘合在一起以及它背后的内存管理。 3. 允许后台进程处理来自网络的数据(即,一旦我们得到第一个字节就开始解析头部)