【问题标题】:MemoryStream have one thread write to it and another readMemoryStream 有一个线程写入,另一个读取
【发布时间】:2012-09-01 22:27:37
【问题描述】:

这就是我写入流然后使用 1 个线程从中读取的方式:

        System.IO.MemoryStream ms = new System.IO.MemoryStream();

        // write to it
        ms.Write(new byte[] { 1, 2, 3, 4, 5, 6, 7 }, 0, 7);

        // go to the begining
        ms.Seek(0, System.IO.SeekOrigin.Begin);

        // now read from it
        byte[] myBuffer = new byte[7];
        ms.Read(myBuffer, 0, 7);

现在我想知道是否可以从一个线程写入内存流并从单独的线程读取该流。

【问题讨论】:

  • 是的,很多地方都在谈论它。如果我能看到一个非常基本的例子来说明我将如何做到这一点,我将不胜感激......

标签: c# multithreading stream memorystream


【解决方案1】:

由于 Stream 状态已满,因此您不能同时从 2 个线程中使用具有搜索功能的 Stream。例如NetworkStream 有 2 个通道,一个用于读取,一个用于写入,因此不支持查找。

如果你需要寻求能力,你需要创建 2 个流,一个用于读取,一个用于写入。否则,您可以简单地创建一个新的 Stream 类型,该类型允许通过对底层流进行独占访问并恢复其写入/读取位置来从底层内存流中读取和写入。一个原始的例子是:

class ProducerConsumerStream : Stream
{
    private readonly MemoryStream innerStream;
    private long readPosition;
    private long writePosition;

    public ProducerConsumerStream()
    {
        innerStream = new MemoryStream();
    }

    public override bool CanRead { get { return true;  } }

    public override bool CanSeek { get { return false; } }

    public override bool CanWrite { get { return true; } }

    public override void Flush()
    {
        lock (innerStream)
        {
            innerStream.Flush();
        }
    }

    public override long Length
    {
        get 
        {
            lock (innerStream)
            {
                return innerStream.Length;
            }
        }
    }

    public override long Position
    {
        get { throw new NotSupportedException(); }
        set { throw new NotSupportedException(); }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        lock (innerStream)
        {
            innerStream.Position = readPosition;
            int red = innerStream.Read(buffer, offset, count);
            readPosition = innerStream.Position;

            return red;
        }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotSupportedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        lock (innerStream)
        {
            innerStream.Position = writePosition;
            innerStream.Write(buffer, offset, count);
            writePosition = innerStream.Position;
        }
    }
}

【讨论】:

  • 效果很好,谢谢!我应该如何实现 seek 方法?我应该移动innerStream.Position 吗?
  • 想想吧。你想寻求做什么?在读通道中寻找还是在写通道中寻找?还是两者兼而有之?
  • @Polity 我需要什么来实现主题标题(“MemoryStream 有一个线程写入它,另一个线程读取”)但使用字符串而不是字节并且没有任何搜索?
  • 当消费者超过生产者时会发生什么?我相信这会导致消费者过早地获得流结束。
  • @NickWhaley 好点,这个实现没有考虑到这一点。当数据可用时,高阶生产者应该向高阶消费者发出信号。随意发布为您执行此操作的实现