【问题标题】:Async / await, TAP and EAP异步/等待、TAP 和 EAP
【发布时间】:2013-12-17 23:38:07
【问题描述】:

我正在尝试从有点混乱的异步套接字代码(BeginSend/EndSendBeginReceive/EndReceive 和大量内部“管理”)转移到异步TcpClient。我对 async/await 还很陌生,所以请多多包涵……

假设以下代码(不相关的代码被剥离):

public async void StartReceive()
{
    while (true)
    {
        var stream = this.MyInternalTcpClient.GetStream();
        if (stream == null) return;

        var buffer = new byte[BUFFERSIZE];
        var bytesread = await stream.ReadAsync(buffer, 0, BUFFERSIZE);
        if (bytesread == 0)
        {
            if (Closed != null)
                Closed(this, new ClosedEventArgs());
            return;
        }

        var message = this.Encoding.GetString(buffer, 0, bytesread);
        this.MyInternalStringBuilder.Append(message);
        // ... message processing here ...

        foreach (var p in parts) {
            //Raise event per message-"part"
            if (MessageReceived != null)
                MessageReceived(this, new MessageReceivedEventArgs(p));
        }
    }
}

我的班级有一个内部字符串生成器,每次接收到数据时都会附加到该字符串生成器(这是因为消息可以拆分为多个接收“事件”)。然后,当满足某些条件时,处理字符串生成器(“运行缓冲区”)并将消息拆分为消息“部分”。对于每个“部分”,都会引发一个事件。该类的许多实例可以在系统中运行。

我的问题是:

  1. 我是否正确假设/理解 MyInternalStringBuilder.Append 从未被称为“故障”?每次TcpListener 收到数据时,它都会“按顺序”添加到(内部)字符串构建器?我不需要使用锁?
  2. 由于这个StartReceive 方法使用内部(“无限”)循环并引发事件,我看不出创建StartReceive 方法async 的意义,但我必须(因为显然能够使用@987654332 @ 完全没有)。我知道我正在混合 TAP/EAP,但出于与此问题无关的原因,我必须这样做。但是,感觉“脏”,因为“async 不应该是void”是我迄今为止收集到的。也许有更好的方法来解决这个问题(除了一起迁移到 TAP)?

【问题讨论】:

  • 我建议从 channel9 观看 Async in ASP.NET,它还介绍了 C# 中使用 AMP、EAP 和 TAP 的异步的简史。可能对你有帮助
  • 我会(我一回到家,因为我这里没有音频),但我应该指出这段代码是作为(“系统”)服务运行的,而不是在 IIS 中并且什么都没有与 ASP.NET 相关(虽然我不确定视频是否专门针对此,但...)
  • 它专门针对 ASP.NET,但关于异步的一般概念是相同的,我相信它无论如何都会为您提供一些价值。

标签: c# async-await tcpclient


【解决方案1】:

是的,一切都会好起来的。因为您只发布了一个活动缓冲区*,所以您在发布下一个缓冲区之前处理它,事件顺序是确定性的(post->receive->process->post->receive...)。如您所述,如果您只在此循环中使用 SB,则无需锁定。

但是,对于MessageReceived 事件中发生的事情,显然存在很大的“如果”。假设你做了体面的事情(例如处理并发布回复),应该没问题。如果您尝试从事件中接收更多信息,所有地狱都会崩溃(例如,发送响应然后等待响应的响应,那将是糟糕的)。如果您的处理是事件驱动的状态机(状态“bar”中的“接收到的消息“foo”,以“垃圾邮件”响应并将状态更改为“bam”,返回循环等待更多事件),那么它通常应该没问题。显然,很难在没有代码的情况下做出判断,所有内容都是基于您的主张,并且根据我对您提出该主张的理解(没关系,您似乎知道您在说什么)的理解。

您描述的处理可能不是最快的(因为在您处理当前缓冲区时传入字节没有缓冲空间)但是实现高吞吐量会更加棘手,这正是因为您暗示的顺序问题.此外,如果流量是请求-响应,那真的没多大关系。

posted buffer: 向网络提供了一个缓冲区来填满它。在 .Net 中,流操作和 AFD/tcp.sys 之间大约有 100 万个抽象层,但概念的应用几乎相同。

【讨论】:

  • 流量确实是 request->response(->request->response... 起泡、冲洗、重复)。它应该是完全异步的;所以描述它的更好方法是 message->response(->message->response...) 或 message1->message2->message3->response 其中响应 可能 是对 message2 的响应但它何时发送并不重要,只要它在某个时候发送(最好是 A.S.A.P.)。我应该指出,这些“客户”中有几个(许多)可以在系统中“运行”;每个都有自己的“状态机”。
  • 如果发件人期望来自 message2 的响应,但在收到响应之前继续发送 message3,则不是请求响应。请求-响应是一种协议(而是消息交换模式),其中任何一方在“轮到”之前都不能发送(即发送方必须在发送 msg3 之前等待对 msg2 的响应)。因此,如果您允许 message3 来处理 您正在处理 message2 并且它正在制定/发送响应,那么理论上您 可以 获得更好的吞吐量。但除非必须,否则不要这样做。潘多拉的盒子。
  • 所有“发件人”所做的是“大喊”“嘿,你认识 Foo 吗?”、“嘿,你认识 Bar 吗?”、“嘿,看看 FooBar!”。有时“接收者”需要发送消息“酷,告诉 Foo 说你好!”。当“发件人”需要回应时,它会停止大喊大叫并平静地等待回应。所以,如果它发送一条消息“嘿,看看这个香蕉,让我知道你的想法”,它会等待并且服务器会“一段时间”回复“酷,它坏了,把它扔掉”在“发件人”之前哪个“发件人”将采取行动并继续大喊大叫。
  • 这是一种“聊天”系统,两端都有状态机,其中“发送者”报告事件,“接收者”可以选择告诉“发送者”有时对发生的事情采取行动(即使那是之前的一些消息)。
  • 显然这里是在黑暗中拍摄,因为都是基于香蕉广播的隐喻,但是您应该考虑为“喊叫”设置一个单独的通道(即不同的 TCP 管道,甚至是 UDP 广播或多播)。由于当一切都在一个管道上流动时发生的固有优先级反转,我在前世被类似的问题烧毁了......
【解决方案2】:

完成 Remus 的回答后,您可能希望确保 StartReceive 只能被调用一次,否则您将遇到严重问题并且需要锁定。

关于void return,我个人认为这是一种很好的情况,以StartBegin 开头的“顶级”异步方法,正确传达了这是一个火灾的意思-和-忘记方法。也就是说,如果你有这样的方法,你可能想重新设计一些东西。

为此,我将使用 TPL Dataflow 等库,其中不同的操作表示为异步块。在您的情况下,将有一个“从套接字块读取”→“进程块”→“发送响应块”,每个块都异步触发,允许您在处理时继续阅读。通过这样做,您将不会有这个大循环,并且void 返回将不再存在。不过很多事情都需要改变。

【讨论】:

  • +1 显然,如果并发调用试图运行循环,所有地狱都会崩溃。重新创建任何最初发送的消息是不可能的。我什至没有想到有人会尝试这样的事情:)
  • 它不在发布的代码中,但方法中有一个“if _running -> throw”-construct 可以防止多次“启动”它。
【解决方案3】:

我是否正确假设/理解 MyInternalStringBuilder.Append 永远不会被称为“乱序”?每次 TcpListener 接收到数据时,它都会“按顺序”添加到(内部)字符串构建器?我不需要使用锁?

没错。 async 代码虽然异步但自然是顺序的。所以,即使在幕后StartReceive 不断返回和恢复,它也不会同时恢复多次。

由于此 StartReceive 方法使用内部(“无限”)循环并引发事件,我看不出使 StartReceive 方法异步的意义,但我必须这样做(显然完全可以使用 await)。我知道我正在混合 TAP/EAP,但出于与此问题无关的原因,我必须这样做。然而,它感觉“脏”,因为“异步不应该是无效的”是我迄今为止收集到的。也许有更好的方法来解决这个问题(除了一起迁移到 TAP)?

我不是async void 的忠实粉丝。首先,我假设您在StartReceive 中有一个顶级try/catch,如果观察到任何异常,它将启动套接字关闭(如果没有,那么您需要一个)。就我个人而言,我会将其编写为 async Task 方法,并考虑将 Task 公开为属性(如果您还为错误处理设置了所有事件,那么您不需要 Task 属性)。

还有一件事需要注意; socket新手经常写的严格的只读/只写循环存在问题:在只读部分,阅读器无法检测到半开的情况;在只写部分,死锁的可能性很小,特别是对于恶意客户端。理想情况下,套接字应该始终进行读取操作(您会这样做),并且它还应该定期发送一些东西(例如,keepalives)。我有一个sockets FAQ,它更详细。

要解决此问题,我建议您将ConfigureAwait(false) 添加到所有awaits,并将您的EAP 事件发布到构造函数中捕获的SynchronizationContext(如果没有,则使用new SynchronizationContext())。然后,您将需要另一个独立的循环,它要么定期发送 keepalive 消息(如果协议允许),要么在一段时间内没有读取任何内容时终止套接字。

【讨论】:

  • "首先,我假设您在 StartReceive 中有一个顶级 try/catch,如果观察到任何异常,它将启动套接字关闭" -> 留待清晰度,但它是“围绕”/“包装”循环(例如try { internal-loop } catch {}。感谢常见问题解答和其他有趣的观察;该协议不支持“保持活力”(或“心跳”),这确实有点糟糕,我需要跟踪某种内部计时器/超时来终止“非活动”连接......
【解决方案4】:

如果你在多个线程之间共享 StringBuilder,你肯定需要使用锁。

另外,我不明白您所说的按顺序是什么意思。 Append 将按 顺序调用。它可能不是您想要的顺序,但是这段代码中没有任何东西可以确保任何特定的顺序。

我可以说,对 Appendinside 的调用将按照代码执行它们的顺序被调用,其行为应该与您对代码的期望完全一样,但同样,如果有多个线程同时调用这个,线程之间的顺序没有给出。

至于你的第二个问题,为什么不直接写整个东西而不使用async 关键字,而是在任务或线程中调用它?

【讨论】:

  • 这个想法是为每个“客户”实例化我的类的一个实例。 “按顺序”是指 TcpClient 接收数据的顺序(假设 TcpClient 处理无序的 TCP 数据包等,它确实如此)。至于在任务/线程中调用它:这听起来像是一个想法......我不会用完“等待”将更有效地处理不同等待的线程吗?
  • 是的,但您仍将拥有原样的代码。老实说,现在代码的结构方式没有任何问题,但我个人不喜欢无法等待它们完成或取消它们的即发即弃的方法。
  • 我可以通过关闭 TcpClient 来完成/取消,这将触发 if (bytesread == 0) -> return 这将“打破”无限循环。
猜你喜欢
  • 1970-01-01
  • 2017-04-17
  • 2018-11-29
  • 2020-12-05
  • 1970-01-01
  • 1970-01-01
  • 2023-03-12
  • 2016-07-07
  • 2016-03-25
相关资源
最近更新 更多