这已经过测试,对于同时异步读写来说是线程安全的。
使用系统;
使用 System.IO;
使用 System.Threading.Tasks;
使用 System.Threading;
使用 System.Collections.Concurrent;
公共类 EchoStream : 流
{
公共覆盖布尔 CanTimeout { 获取; } = 真;
公共覆盖 int ReadTimeout { get;放; } = 超时。无限;
公共覆盖 int WriteTimeout { get;放; } = 超时。无限;
公共覆盖 bool CanRead { get; } = 真;
公共覆盖 bool CanSeek { get; } = 假;
公共覆盖 bool CanWrite { get; } = 真;
public bool CopyBufferOnWrite { get; set; } = false;
private readonly object _lock = new object();
// Default underlying mechanism for BlockingCollection is ConcurrentQueue<T>, which is what we want
private readonly BlockingCollection<byte[]> _Buffers;
private int _maxQueueDepth = 10;
private byte[] m_buffer = null;
private int m_offset = 0;
private int m_count = 0;
private bool m_Closed = false;
private bool m_FinalZero = false; //after the stream is closed, set to true after returning a 0 for read()
public override void Close()
{
m_Closed = true;
// release any waiting writes
_Buffers.CompleteAdding();
}
public bool DataAvailable
{
get
{
return _Buffers.Count > 0;
}
}
private long _Length = 0L;
public override long Length
{
get
{
return _Length;
}
}
private long _Position = 0L;
public override long Position
{
get
{
return _Position;
}
set
{
throw new NotImplementedException();
}
}
public EchoStream() : this(10)
{
}
public EchoStream(int maxQueueDepth)
{
_maxQueueDepth = maxQueueDepth;
_Buffers = new BlockingCollection<byte[]>(_maxQueueDepth);
}
// we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
public new Task WriteAsync(byte[] buffer, int offset, int count)
{
return Task.Run(() => Write(buffer, offset, count));
}
// we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
public new Task<int> ReadAsync(byte[] buffer, int offset, int count)
{
return Task.Run(() =>
{
return Read(buffer, offset, count);
});
}
public override void Write(byte[] buffer, int offset, int count)
{
if (m_Closed || buffer.Length - offset < count || count <= 0)
return;
byte[] newBuffer;
if (!CopyBufferOnWrite && offset == 0 && count == buffer.Length)
newBuffer = buffer;
else
{
newBuffer = new byte[count];
System.Buffer.BlockCopy(buffer, offset, newBuffer, 0, count);
}
if (!_Buffers.TryAdd(newBuffer, WriteTimeout))
throw new TimeoutException("EchoStream Write() Timeout");
_Length += count;
}
public override int Read(byte[] buffer, int offset, int count)
{
if (count == 0)
return 0;
lock (_lock)
{
if (m_count == 0 && _Buffers.Count == 0)
{
if (m_Closed)
{
if (!m_FinalZero)
{
m_FinalZero = true;
return 0;
}
else
{
return -1;
}
}
if (_Buffers.TryTake(out m_buffer, ReadTimeout))
{
m_offset = 0;
m_count = m_buffer.Length;
}
else
{
if (m_Closed)
{
if (!m_FinalZero)
{
m_finalZero = true;
return 0;
}
else
{
return -1;
}
}
else
{
return 0;
}
}
}
int returnBytes = 0;
while (count > 0)
{
if (m_count == 0)
{
if (_Buffers.TryTake(out m_buffer, 0))
{
m_offset = 0;
m_count = m_buffer.Length;
}
else
break;
}
var bytesToCopy = (count < m_count) ? count : m_count;
System.Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, bytesToCopy);
m_offset += bytesToCopy;
m_count -= bytesToCopy;
offset += bytesToCopy;
count -= bytesToCopy;
returnBytes += bytesToCopy;
}
_Position += returnBytes;
return returnBytes;
}
}
public override int ReadByte()
{
byte[] returnValue = new byte[1];
return (Read(returnValue, 0, 1) <= 0 ? -1 : (int)returnValue[0]);
}
public override void Flush()
{
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
}