【问题标题】:How to read continuously and asynchronously from network stream using abstract stream?如何使用抽象流从网络流中连续异步读取?
【发布时间】:2022-08-20 21:26:08
【问题描述】:

描述

我想使用它们的抽象 Stream 父类从 NetworkStreamSSLStream 异步读取。从流中异步读取有不同的方法:

  • 异步编程模型 (APM):它使用 BeginReadEndRead 操作。
  • 任务并行库 (TPL):它使用 Task 并创建任务延续。
  • 基于任务的异步模式 (TAP):操作带有后缀异步并且可以使用asyncawait 关键字。

我最感兴趣的是使用轻敲模式来实现异步读操作。下面的代码,异步读取到流的末尾并以字节数组的形式返回数据:

    internal async Task<byte[]> ReadToEndAsync(Stream stream)
    {
        byte[] buffer = new byte[4096];
        using (MemoryStream memoryStream = new MemoryStream())
        {
            int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
            while (bytesRead != 0)
            {
                // Received datas were aggregated to a memory stream.
                await memoryStream.WriteAsync(buffer, 0, bytesRead);
                bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
            }

            return memoryStream.ToArray();
        }
    }

缓冲区大小为 4096kB。如果传输的数据大于缓冲区大小,则它将继续读取直到 0(零),即流的末尾。它适用于FileStream,但它在使用NetworkStreamSslStreamReadAsync 操作中无限期挂起。这两个流的行为与其他流不同。

问题在于网络流ReadAsync 只会在Socket 通信关闭时返回0(零)。不过我愿意不是每次通过网络传输数据时都希望关闭通信。

问题

如何在不关闭Socket 通信的情况下避免ReadAsync 的阻塞调用?

  • 如果套接字打开但没有可用数据,您会期望表示与该套接字的连接的流的ReadAsync 无限期挂起,直到数据到达、套接字关闭或触发取消。使用... await ReadAsync(...) 只阻塞等待的任务,允许其他任务继续处理。
  • 没错。但我需要当前接收数据的结果。因此,我也等待 ReadToEndAsync 方法导致阻塞。
  • 考虑使用 TPL 数据流创建网格,docs.microsoft.com/en-us/dotnet/standard/parallel-programming/…。但是,您不能既等到结束又在结束前开始处理。您将需要具有中间缓冲的并发任务。
  • @Jodrell 感谢您指出 DataFlow 编程模型。事实证明,它可以与 async 和 await 运算符一起使用 docs.microsoft.com/en-us/dotnet/standard/parallel-programming/…
  • TPL 数据流类型专为使用 TAP 异步/等待而设计。它们可以使您免于编写一些有风险的管道。我的观点不是关于 Dataflow 的适用性,而是需要稍微调整一下你的观点。

标签: c# sockets asynchronous .net-standard-2.0 networkstream


【解决方案1】:

我想使用它们的抽象 Stream 父类从 NetworkStream 或 SSLStream 异步读取。

你真的需要至?正确执行 TCP/IP 套接字通信非常复杂。我强烈建议自托管 HTTP API 或类似的东西。

问题在于网络流 ReadAsync 只会在 Socket 通信关闭时返回 0(零)。

是的,您发布的代码仅在读取整个流后才返回结果。

因此,您需要的是不同的退货条件。具体来说,你需要知道什么时候单信息已从流中读取。您需要的是message framing,如我的博客中所述。

我的博客上还有一些sample code,展示了最简单的消息框架解决方案之一(长度前缀)。仔细注意文件的复杂性和长度最简单的解决方案,并考虑您是否真的要编写套接字级代码。

如果您确实想继续编写套接字应用程序,我建议您观看我的YouTube series on asynchronous TCP/IP socket programming

【讨论】:

  • 是的,我们从头开始实现了一个 TCP/IP 套接字,因此我们不能轻易切换到 HTTP。感谢您提供 youtube 视频链接,我希望我能早点看到这些视频。我一定会检查出来的。
【解决方案2】:

正如 Stephen Cleary 博客文章所指出的,message framing 有两种方法:长度前缀和分隔符。

  • 长度前缀: 消息长度已知,因为它是通过网络发送的。
  • 分隔符: 消息长度未知。转义由分隔符确定。

下面的代码使用长度前缀来为消息添加消息长度。

internal async Task SendAsync(Stream stream, byte[] message)
{
    byte[] messageSize = BitConverter.GetBytes(message.Length);
    await stream.WriteAsync(messageSize, 0, messageSize.Length).ConfigureAwait(false);
    await stream.WriteAsync(message, 0, message.Length).ConfigureAwait(false);
}

发送方首先将消息长度转换为字节数组,然后发送给服务器。 messageSize.Length 是 4,因为转换了带符号的 32 位整数。之后,客户端发送实际消息。接收端稍微复杂一点:

internal async Task<byte[]> ReceiveAsync(Stream stream)
{
    byte[] messageBuffer = null;
    byte[] lengthBuffer = new byte[4];

    try
    {
        await stream.ReadAsync(lengthBuffer, 0, lengthBuffer.Length).ConfigureAwait(false);
        int messageLength = BitConverter.ToInt32(lengthBuffer, 0);

        messageBuffer = new byte[messageLength];
        await ReadToEndAsync(stream, messageBuffer, messageLength).ConfigureAwait(false);
    }
    catch (Exception)
    {
        // Error occured during receiving.
        throw;
    }

    return dataBuffer;
}

首先,接收者将消息长度读取为长度为 4 的字节数组。然后将接收到的字节数组转换为有符号的Int32 值。一旦知道消息长度,就可以读取消息直到结束。

private async Task ReadToEndAsync(Stream stream, byte[] messageBuffer, int messageLength)
{
    int offset = 0;
    while (offset < messageLength)
    {
        int bytesRead = await stream.ReadAsync(messageBuffer, offset, messageLength).ConfigureAwait(false);
        if (bytesRead == 0)
        {
            // Socket is closed.
            break;
        }

        offset += bytesRead;
    }
}

它从流中读取消息,直到到达messageLength。如果单个读取操作还不够,那么它会根据从流中读取的字节数来提高缓冲区的偏移量。

注意考虑防止 DOS 攻击:

无论是使用长度前缀还是分隔符,都必须包含防止拒绝服务攻击的代码。长度前缀的阅读器可以被赋予巨大的消息大小;在没有分隔符的情况下,可以给定界读者提供大量数据。

【讨论】:

  • 上面的代码不能正确处理同时读取两条消息的情况。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-01-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多