【问题标题】:Accepting incoming requests asynchronously异步接受传入请求
【发布时间】:2024-04-14 15:45:02
【问题描述】:

我已经确定了我的 TCP 应用程序中的一个瓶颈,我为了这个问题而对其进行了简化。

我有一个MyClient 类,它代表客户端何时连接;我还有一个MyWrapper 类,它代表满足某些条件的客户端。如果MyClient满足某些条件,则它有资格成为包装器。

我想公开一个允许调用者等待MyWrapper的方法,该方法应该处理无效MyClients的协商和拒绝:

public static async Task StartAccepting(CancellationToken token)
{
    while (!token.IsCancellationRequested)
    {
        var wrapper = await AcceptWrapperAsync(token);

        HandleWrapperAsync(wrapper);
    }
}

因此AcceptWrapperAsync 等待一个有效的包装器,而HandleWrapperAsync 异步处理包装器而不阻塞线程,因此AcceptWrapperAsync 可以尽快恢复工作。

该方法在内部的工作原理是这样的:

public static async Task<MyWrapper> AcceptWrapperAsync(CancellationToken token)
{
    while (!token.IsCancellationRequested)
    {
        var client = await AcceptClientAsync();
        if (IsClientWrappable(client))
            return new MyWrapper(client);
    }
    return null;
}

public static async Task<MyClient> AcceptClientAsync()
{
    await Task.Delay(1000);
    return new MyClient();
}

private static Boolean IsClientWrappable(MyClient client)
{
    Thread.Sleep(500);
    return true;
}

这段代码模拟每秒有一个客户端连接,并且需要半秒来检查连接是否适合包装器。 AcceptWrapperAsync 循环直到生成有效的包装器,然后返回。

这种运作良好的方法存在缺陷。在IsClientWrappable 执行期间,不能接受更多的客户端,当大量客户端同时尝试连接时会造成瓶颈。恐怕在现实生活中,如果服务器在连接了很多客户端的情况下出现故障,那么上升不会很好,因为所有客户端都会同时尝试连接。我知道同时连接所有这些非常困难,但我想加快连接过程。

使IsClientWrappable异步,只会确保执行线程在协商完成之前不会被阻塞,但无论如何都会阻塞执行流程。

如何改进这种方法以不断接受新客户,但仍然能够等待使用 AcceptWrapperAsync 的包装器?

【问题讨论】:

  • 这似乎是 Reactive Extensions (Rx) 的一个很好的用例
  • Rx 对于这将是一个矫枉过正。看来Dataflow更有意义。
  • 使用 RX,您将数据推送到订阅者,而使用 DataFlow,您将数据推送到消费者。 :)
  • 好吧,看来RX远不止这些。数据流只是异步工作的类的集合:P

标签: c# multithreading asynchronous async-await asyncsocket


【解决方案1】:
//this loop must never be blocked
while (!token.IsCancellationRequested)
{
    var client = await AcceptClientAsync();
    HandleClientAsync(client); //must not block
}

Task HandleClientAsync(Client client) {
    if (await IsClientWrappableAsync(client)) //make async as well, don't block
        await HandleWrapperAsync(new MyWrapper(client));
}

通过这种方式,您可以将 IsClientWrappable 逻辑移出接受循环并进入后台异步工作流。

如果你不想让IsClientWrappable 非阻塞,只需用Task.Run 包裹它。 HandleClientAsync 不阻塞是很重要的,这样它的调用者也不会阻塞。

【讨论】:

  • 我不能这样做,正如我所说,我必须提供一个等待的AcceptWrapperAsync
  • 好的,在这种情况下我会研究 Rx。看起来您有一个传入客户端流,需要过滤并转换为包装客户端。一切都是异步的。 Rx 可以做到这一点,但我对此还不够熟悉。
【解决方案2】:

TPL 数据流助您一臂之力。我创建了一个带有两个队列的“生产者/消费者”对象:

  1. 接受来自“producer”的输入并将其存储在“in”队列中。
  2. 从“in”队列中读取的内部异步任务,并以给定的最大并行度并行处理输入。
  3. 之后将已处理的项目放入“out”队列中。结果或异常。
  4. 接受消费者await 一个项目。然后可以检查处理是否成功。

我已经做了一些测试,它似乎工作正常,但我想做更多的测试:

public sealed class ProcessingResult<TOut> 
    where TOut : class
{
    public TOut Result { get; internal set; }
    public Exception Error { get; internal set; }
}

public abstract class ProcessingBufferBlock<TIn,TOut> 
    where TIn:class 
    where TOut:class
{
    readonly BufferBlock<TIn> _in;
    readonly BufferBlock<ProcessingResult<TOut>> _out;
    readonly CancellationToken _cancellation;
    readonly SemaphoreSlim _semaphore;

    public ProcessingBufferBlock(Int32 boundedCapacity, Int32 degreeOfParalellism, CancellationToken cancellation)
    {
        _cancellation = cancellation;
        _semaphore = new SemaphoreSlim(degreeOfParalellism);
        var options = new DataflowBlockOptions() { BoundedCapacity = boundedCapacity, CancellationToken = cancellation };
        _in = new BufferBlock<TIn>(options);
        _out = new BufferBlock<ProcessingResult<TOut>>(options);
        StartReadingAsync();
    }

    private async Task StartReadingAsync()
    {
        await Task.Yield();
        while (!_cancellation.IsCancellationRequested)
        {
            var incoming = await _in.ReceiveAsync(_cancellation);
            ProcessThroughGateAsync(incoming);
        }
    }

    private async Task ProcessThroughGateAsync(TIn input)
    {
        _semaphore.Wait(_cancellation);
        Exception error=null;
        TOut result=null;
        try
        {
            result = await ProcessAsync(input);                   
        }
        catch (Exception ex)
        {
            error = ex;
        }
        finally
        {
            if(result!=null || error!=null)
                _out.Post(new ProcessingResult<TOut>() { Error = error, Result = result });
            _semaphore.Release(1);
        }
    }

    protected abstract Task<TOut> ProcessAsync(TIn input);

    public void Post(TIn item)
    {
        _in.Post(item);
    }

    public Task<ProcessingResult<TOut>> ReceiveAsync()
    {
        return _out.ReceiveAsync();
    }
}

所以我在 OP 上使用的示例是这样的:

public class WrapperProcessingQueue : ProcessingBufferBlock<MyClient, MyWrapper>
{
    public WrapperProcessingQueue(Int32 boundedCapacity, Int32 degreeOfParalellism, CancellationToken cancellation)
        : base(boundedCapacity, degreeOfParalellism, cancellation)
    { }

    protected override async Task<MyWrapper> ProcessAsync(MyClient input)
    {
        await Task.Delay(5000);
        if (input.Id % 3 == 0)
            return null;
        return new MyWrapper(input);
    }
}

然后我可以尽快将MyClient 对象添加到该队列中,它们将被并行处理,消费者将等待通过过滤器的对象。

正如我所说,我想做更多的测试,但我们非常欢迎任何反馈。

干杯。

【讨论】: