【问题标题】:Threadsafe buffer wrapping Stream线程安全缓冲区包装流
【发布时间】:2026-01-03 00:35:02
【问题描述】:

我在TcpClient 之上使用SslStream。不幸的是,`SslStream` 不支持同时从多个线程写入或读取。这就是为什么我围绕它编写了自己的包装器:

private ConcurrentQueue<byte> sendQueue;

private volatile bool oSending;

private readonly object writeLock;


public async void Write(byte[] buffer, int offset, int count)
{
  if (oSending)
  {
    lock (writeLock)
    {
      foreach (var b in buffer)
      {
        sendQueue.Enqueue(b);
      }
    }
  }
  else
  {
    oSending = true;
    await stream.WriteAsync(buffer, offset, count);
    oSending = false;

    lock (writeLock)
    {
      if (sendQueue.Count > 0)
      {
        Write(sendQueue.ToArray(), 0, sendQueue.Count);
        sendQueue = new ConcurrentQueue<byte>();
      }
    }
  }
}

背后的意图如下:

  1. 如果流是空闲的,立即写入流。
  2. 如果流忙,则写入缓冲区。
  3. 如果流从发送返回,检查队列中是否有数据,递归发送。

到目前为止,我已经尝试了几种解决方案,但似乎每次都发送了太多数据。

P.S.:我知道按字节填充队列并不好,但这只是变得又快又脏。

更新:我已根据 Dirk 的评论添加了队列的删除。

【问题讨论】:

  • 您是否曾经从队列中删除过项目?

标签: c# .net multithreading concurrency buffer


【解决方案1】:

更新

使用TPL Dataflow

using System.Threading.Tasks.Dataflow;

public class DataflowStreamWriter
{
    private readonly MemoryStream _stream = new MemoryStream();
    private readonly ActionBlock<byte[]> _block;

    public DataflowStreamWriter()
    {
        _block = new ActionBlock<byte[]>(
                        bytes => _stream.Write(bytes, 0, bytes.Length));
    }

    public void Write(byte[] data)
    {
        _block.Post(data);
    }
}

这是一种更好的生产者-消费者方法。

每当有人向您的ConcurrentStreamWriter 实例写入数据时,该数据就会被添加到缓冲区中。此方法是线程安全的,多个线程可能同时写入数据。这些是你的生产者

然后,您有一个 consumer - 消耗缓冲区中的数据并将其写入流中。

BlockingCollection&lt;T&gt; 用于在生产者和消费者之间进行通信。这样,如果没有人在生产,消费者就会处于空闲状态。每当生产者启动并向缓冲区写入内容时,消费者就会醒来。

消费者被延迟初始化 - 当且仅当某些数据首次可用时才会创建。

public class ConcurrentStreamWriter : IDisposable
{
    private readonly MemoryStream _stream = new MemoryStream();
    private readonly BlockingCollection<byte> _buffer = new BlockingCollection<byte>(new ConcurrentQueue<byte>());

    private readonly object _writeBufferLock = new object();
    private Task _flusher;
    private volatile bool _disposed;

    private void FlushBuffer()
    {
        //keep writing to the stream, and block when the buffer is empty
        while (!_disposed)
            _stream.WriteByte(_buffer.Take());

        //when this instance has been disposed, flush any residue left in the ConcurrentStreamWriter and exit
        byte b;
        while (_buffer.TryTake(out b))
            _stream.WriteByte(b);
    }

    public void Write(byte[] data)
    {
        if (_disposed)
            throw new ObjectDisposedException("ConcurrentStreamWriter");

        lock (_writeBufferLock)
            foreach (var b in data)
                _buffer.Add(b);

        InitFlusher();
    }

    public void InitFlusher()
    {
        //safely create a new flusher task if one hasn't been created yet
        if (_flusher == null)
        {
            Task newFlusher = new Task(FlushBuffer);
            if (Interlocked.CompareExchange(ref _flusher, newFlusher, null) == null)
                newFlusher.Start();
        }
    }

    public void Dispose()
    {
        _disposed = true;
        if (_flusher != null)
            _flusher.Wait();

        _buffer.Dispose();
    }
}

【讨论】:

  • 感谢您的支持!基于 TPL Dataflow 的解决方案效果很好。这样说不是更好吗: _block = new ActionBlock(async bytes => await stream.WriteAsync(bytes, 0, bytes.Length));我想避免为我拥有的每个打开的连接阻塞一个线程。
  • @sqeez3r 是的 :)
【解决方案2】:
  1. 您正在锁定对 ConcurrentQueue&lt;T&gt; 的访问 - 您不需要,队列已经是线程安全的
  2. if(oSending) {} else {oSending = true} 不是线程安全的。两个线程可能将oSending 读为false,进入else 块并将其设置为true。现在您有两个线程写入流。
  3. 正如 Dirk 指出的那样,您并没有从队列中删除项目。

我的修改:

  1. 不使用布尔标志,而是使用Monitor.TryEnter 来尝试访问流。如果当前正在写入流,则调用将立即返回 - 并继续写入缓冲区。

  2. 实现IDisposable 并确保Dispose 刷新缓冲区。

  3. 仅在写入队列时锁定队列,以保持字节顺序
  4. 将签名从async void更改为async Task


private readonly ConcurrentQueue<byte> _bufferQueue = new ConcurrentQueue<byte>();

private readonly object _bufferLock = new object();
private readonly object _streamLock = new object();
private readonly MemoryStream stream = new MemoryStream();

public async Task Write(byte[] data, int offset, int count)
{
    bool streamLockTaken = false;

    try
    {
        //attempt to acquire the lock - if lock is currently taken, return immediately
        Monitor.TryEnter(_streamLock, ref streamLockTaken);

        if (streamLockTaken) //write to stream
        {
            //write data to stream and flush the buffer
            await stream.WriteAsync(data, offset, count);
            await FlushBuffer();

        }
        else //write to buffer
        {
            lock (_bufferLock)
                foreach (var b in data)
                    _bufferQueue.Enqueue(b);
        }
    }
    finally
    {
        if (streamLockTaken)
            Monitor.Exit(_streamLock);
    }
}

private async Task FlushBuffer()
{
    List<byte> bufferedData = new List<byte>();
    byte b;
    while (_bufferQueue.TryDequeue(out b))
        bufferedData.Add(b);

    await stream.WriteAsync(bufferedData.ToArray(), 0, bufferedData.Count);
}

public void Dispose()
{
    lock(_streamLock)
        FlushBuffer().Wait();
}

【讨论】:

  • 1.我使用锁是因为我不想丢失字节的顺序。一旦我一次添加整个字节范围,我就可以删除它。 2. 我该如何纠正? 3. 当我从一种可能的解决方案更改为另一种可能的解决方案时,我忘记保留这个。 (我从BeginWrite 和一个回调方法开始。
  • @sqeez3r 我已经用一个例子更新了我的帖子 - 试试看。关于锁,很公平,但你只需要锁来写入队列,而不是从中读取。
  • 感谢您更新您的帖子并感谢您的示例。我已经尝试过了,但我遇到了 SynchronizationLockException。我目前正试图了解为什么会发生这种情况。
  • @sqeez3r 异常发生在哪一行?
  • @sqeez3r 没错,这就是我添加 flush 方法的原因——以确保任何残留物都被刷新到流中。您最初的策略不会遇到这种情况,但这是一种更糟糕的方法。如果流忙,则写入队列缓冲区并进行递归调用 - 这很糟糕。 1) 你正在“旋转”等待 I/O 完成,2) 你让所有线程忙于递归调用,3) 可能会遇到 *Exception。
【解决方案3】:

你不能只锁定底层流吗?我相信它可以像这样简单:

private readonly object writeLock = new Object();

public async void Write(byte[] buffer, int offset, int count)
{
   lock (writeLock)
   {
      await stream.WriteAsync(buffer, offset, count);
   }
}

另外,通过您的队列实现,我认为写入可以排队的机器人永远不会写入流的变化。例如,在另一个线程出队之后但在该线程释放其锁之前排队。

【讨论】:

  • 简单地锁定流会导致许多线程等待锁定。该应用程序将处理多达 3000 个连接,我不能承受太多的锁。
  • @sqeez3r 在这种情况下,dcastro 的代码看起来不错。请记住,即使大多数线程会快速释放(缓冲)锁,也会有一些线程最终会在发送从其他线程排队的所有数据时付出代价。