【问题标题】:What is the async/await equivalent of a ThreadPool server?什么是 ThreadPool 服务器的 async/await 等价物?
【发布时间】:2014-01-27 14:55:38
【问题描述】:

我正在使用同步 api 和线程池开发一个看起来像这样的 tcp 服务器:

TcpListener listener;
void Serve(){
  while(true){
    var client = listener.AcceptTcpClient();
    ThreadPool.QueueUserWorkItem(this.HandleConnection, client);
    //Or alternatively new Thread(HandleConnection).Start(client)
  }
}

假设我的目标是以最低的资源使用率处理尽可能多的并发连接,这似乎很快就会受到可用线程数量的限制。我怀疑通过使用非阻塞任务 api,我将能够用更少的资源处理更多的事情。

我的第一印象是这样的:

async Task Serve(){
  while(true){
    var client = await listener.AcceptTcpClientAsync();
    HandleConnectionAsync(client); //fire and forget?
  }
}

但令我震惊的是,这可能会导致瓶颈。也许 HandleConnectionAsync 将花费异常长的时间来达到第一个等待,并将停止主接受循环继续进行。这会永远只使用一个线程,还是运行时会神奇地在多个线程上运行它认为合适的东西?

有没有办法将这两种方法结合起来,以便我的服务器准确地使用它所需的线程数来处理正在运行的任务的数量,同时又不会在 IO 操作上不必要地阻塞线程?

在这种情况下,有没有一种惯用的方法来最大化吞吐量?

【问题讨论】:

  • async / await 在当前线程 afaik 下工作。如果您想将它们安排在任务池中,则需要显式使用任务。也就是说,你当前的实现是纯单线程的。
  • 这可能会给你所有的答案:Awaiting Socket Operations.
  • @Noseratio,这回答了一个完全不同的问题,但我不明白它如何解决我的问题。那篇文章讨论了公开不同异步模式的等待 api,就好像它们公开了基于任务的 api。 TcpListener 确实提供了基于任务的 api。
  • @captncraig,诚然,我最初不明白这个问题。更好理解,这里是my thoughs

标签: c# threadpool async-await tcplistener


【解决方案1】:

我会让框架管理线程并且不会创建任何额外的线程,除非分析测试表明我可能需要这样做。特别是如果HandleConnectionAsync 内部的调用主要是 IO 绑定的。

不管怎样,如果你想在HandleConnectionAsync开头释放调用线程(dispatcher),有一个非常简单的解决方案。 您可以使用await Yield()ThreadPool 跳转到一个新线程。 如果您的服务器在初始线程上没有安装任何同步上下文的执行环境中运行(控制台应用程序, WCF 服务),通常是 TCP 服务器的情况。

以下说明了这一点(代码最初来自here)。请注意,while 主循环不会显式创建任何线程:

using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

class Program
{
    object _lock = new Object(); // sync lock 
    List<Task> _connections = new List<Task>(); // pending connections

    // The core server task
    private async Task StartListener()
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            // if already faulted, re-throw any error on the calling context
            if (task.IsFaulted)
                await task;
        }
    }

    // Register and handle the connection
    private async Task StartHandleConnectionAsync(TcpClient tcpClient)
    {
        // start the new connection task
        var connectionTask = HandleConnectionAsync(tcpClient);

        // add it to the list of pending task 
        lock (_lock)
            _connections.Add(connectionTask);

        // catch all errors of HandleConnectionAsync
        try
        {
            await connectionTask;
            // we may be on another thread after "await"
        }
        catch (Exception ex)
        {
            // log the error
            Console.WriteLine(ex.ToString());
        }
        finally
        {
            // remove pending task
            lock (_lock)
                _connections.Remove(connectionTask);
        }
    }

    // Handle new connection
    private async Task HandleConnectionAsync(TcpClient tcpClient)
    {
        await Task.Yield();
        // continue asynchronously on another threads

        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    }

    // The entry point of the console app
    static async Task Main(string[] args)
    {
        Console.WriteLine("Hit Ctrl-C to exit.");
        await new Program().StartListener();
    }
}

或者,代码可能如下所示,没有await Task.Yield()。请注意,我将async lambda 传递给Task.Run,因为我仍然希望HandleConnectionAsync 中的异步API 中受益并在其中使用await

// Handle new connection
private static Task HandleConnectionAsync(TcpClient tcpClient)
{
    return Task.Run(async () =>
    {
        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    });
}

更新,基于评论:如果这将是库代码,则执行环境确实未知,并且可能具有非默认同步上下文。在这种情况下,我宁愿在池线程(没有任何同步上下文)上运行主服务器循环:

private static Task StartListener()
{
    return Task.Run(async () => 
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            if (task.IsFaulted)
                await task;
        }
    });
}

这样,在StartListener 中创建的所有子任务都不会受到客户端代码的同步上下文的影响。所以,我不必在任何地方显式调用Task.ConfigureAwait(false)

2020年更新,刚刚有人在场外问了个好问题:

我想知道在这里使用锁的原因是什么?这不是 异常处理所必需的。我的理解是锁是 使用是因为 List 不是线程安全的,因此真正的问题 这就是为什么将任务添加到列表中(并在 加载)。

由于 Task.Run 完全能够跟踪它的任务 开始,我的想法是,在这个具体的例子中,锁是 没用,但是你把它放在那里,因为在一个真正的程序中,有 例如,列表中的任务允许我们迭代当前 如果程序接收到,则运行任务并干净地终止任务 来自操作系统的终止信号。

事实上,在现实生活场景中,我们几乎总是希望跟踪以Task.Run(或任何其他“正在运行”的Task 对象)开始的任务,原因如下:

  • 跟踪任务异常,否则 might be silently swallowed 如果在其他地方未观察到。
  • 能够异步等待所有待处理任务的完成(例如,考虑启动/停止 UI 按钮或处理启动/停止无头 Windows 服务内部的请求)。
  • 为了能够控制(和限制/限制)我们允许同时运行的任务数量。

有更好的机制来处理现实生活中的并发工作流(例如,TPL 数据流库),但我确实在此处包含了任务列表和锁定,即使在这个简单的示例中也是如此。使用即发即弃的方法可能很诱人,但这几乎从来都不是一个好主意。根据我自己的经验,当我确实想要一劳永逸时,我使用了async void 方法(检查this)。

【讨论】:

  • 非常有趣。如果我不能保证它将在其中运行的环境(这是在库中),看起来 Task.Yield 的文档说不要指望 SynchronizatonContext 正确切换回来。 Task.Run 可能是我最安全的选择。
  • @captncraig,文档的意思是不要在 UI 线程 上使用它进行长时间的处理(例如使用 await Task.Yeild() 进行紧密循环)。那是因为syn。 UI 线程的上下文使用PostMessage 表示内部深处,它可能会接管其他用户输入消息,如鼠标和键盘,并阻塞 UI。所有这些都不适用于无上下文环境,其中Task.Yield() 只使用ThreadPool.QueueUserWorkItem,这是一个方便的等待快捷方式。更多信息:stackoverflow.com/q/20319769/1768303
  • @Noseratio 您能否在您的代码中扩展 Task.Run 的使用?据我了解,这主要是等待TCPClient 接收数据的I/O 操作。为什么要为while (true) 循环触发一个新的ThreadPool 线程?
  • @YuvalItzchakov,它只是为了“摆脱困境”客户端代码在调用线程上可能拥有的任何同步上下文。是一次性的,可以换成await tcpListener.AcceptTcpClientAsync().ConfigureAwait(false)
  • 太棒了。谢谢你的解释!
【解决方案2】:

根据 Microsoft http://msdn.microsoft.com/en-AU/library/hh524395.aspx#BKMK_VoidReturnType,不应使用 void 返回类型,因为它无法捕获异常。正如您所指出的,您确实需要“触发并忘记”任务,所以我的结论是您必须始终返回 Task(正如 Microsoft 所说),但您应该使用以下方法捕获错误:

TaskInstance.ContinueWith(i => { /* exception handler */ }, TaskContinuationOptions.OnlyOnFaulted);

我用作证明的例子如下:

public static void Main()
{
    Awaitable()
        .ContinueWith(
            i =>
                {
                    foreach (var exception in i.Exception.InnerExceptions)
                    {
                        Console.WriteLine(exception.Message);
                    }
                },
            TaskContinuationOptions.OnlyOnFaulted);
    Console.WriteLine("This needs to come out before my exception");
    Console.ReadLine();
}

public static async Task Awaitable()
{
    await Task.Delay(3000);
    throw new Exception("Hey I can catch these pesky things");
}

【讨论】:

    【解决方案3】:

    您是否有任何理由需要接受异步连接?我的意思是,等待任何客户端连接会给您带来任何价值吗?这样做的唯一原因是因为在等待连接时服务器中正在进行一些其他工作。如果有,您可能可以这样做:

        public async void Serve()
        {
            while (true)
            {
                var client = await _listener.AcceptTcpClientAsync();
                Task.Factory.StartNew(() => HandleClient(client), TaskCreationOptions.LongRunning);
            }
        }
    

    这样,accepting 将释放当前线程离开选项以完成其他事情,并且处理在新线程上运行。唯一的开销是在客户端直接返回接受新连接之前生成一个新线程来处理客户端。

    编辑: 刚刚意识到这与您编写的代码几乎相同。认为我需要再次阅读您的问题以更好地了解您的实际要求:S

    编辑2:

    有没有办法将这两种方法结合起来,以便我的服务器完全使用 它需要的线程数来处理正在运行的任务的数量,但是这样它就会 不要在 IO 操作上不必要地阻塞线程?

    认为我的解决方案实际上回答了这个问题。真的有必要吗?

    编辑3: 使 Task.Factory.StartNew() 实际上创建了一个新线程。

    【讨论】:

    • 异步接受连接确实增加了价值,因为它不会浪费线程只做等待连接。在大多数情况下,等待连接比实际处理请求花费的时间要长得多。此外,Task.StartNew 生成一个新线程,而是使用 ThreadPool 中的一个线程
    • 当然,如果还有其他工作要做,它会增加价值,因此我在原始答案中提出了问题。如果没有其他工作要做,我真的看不到收益,除非阻塞线程和挂起线程之间存在巨大差异。提示 TaskCreationOptions.LongRunning 应该在一个新线程上启动它,但你是对的。
    【解决方案4】:

    现有答案已正确建议使用Task.Run(() =&gt; HandleConnection(client));,但未解释原因。

    原因如下:您担心HandleConnectionAsync 可能需要一些时间才能完成第一个等待。如果你坚持使用异步 IO(在这种情况下你应该这样做),这意味着 HandleConnectionAsync 正在执行 CPU 密集型工作而没有任何阻塞。这是线程池的完美案例。它用于运行短时间、非阻塞 CPU 工作。

    你是对的,接受循环会被HandleConnectionAsync 限制,需要很长时间才能返回(可能是因为其中有大量的 CPU 密集型工作)。如果您需要高频率的新连接,则应避免这种情况。

    如果您确定没有重大工作限制循环,您可以保存额外的线程池 Task 而不要这样做。

    或者,您可以同时运行多个接受。将await Serve(); 替换为(例如):

    var serverTasks =
        Enumerable.Range(0, Environment.ProcessorCount)
        .Select(_ => Serve());
    await Task.WhenAll(serverTasks);
    

    这消除了可扩展性问题。 Note, that await will swallow all but one error here.

    【讨论】:

      【解决方案5】:

      试试

      TcpListener listener;
      void Serve(){
        while(true){
          var client = listener.AcceptTcpClient();
          Task.Run(() => this.HandleConnection(client));
          //Or alternatively new Thread(HandleConnection).Start(client)
        }
      }
      

      【讨论】:

      • 这如何回答最初的问题?
      • 这不是本质上为每个连接生成一个新线程,就像我最初的实现一样吗?
      • 我以为您想使用任务来执行此操作...就目前而言,您无法使用“线程池服务器”来限制线程数。您需要为每个 IO 调用使用非阻塞 API。
      • “这实际上不是为每个连接生成一个新线程吗”不。它正在生成一个新 Task,但是一个 Task 不等于一个 Thread。一个线程可以处理多个任务,并按需生成新线程。
      • Task.Run 绝对 与启动一个新线程相同。 Task.Run 将使用 ThreadPool 中的现有线程,而 Thread.Start 将启动一个永远不会被重用的新线程
      猜你喜欢
      • 1970-01-01
      • 2017-05-01
      • 2013-06-16
      • 1970-01-01
      • 2013-09-07
      • 2022-07-27
      • 2013-05-08
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多