【发布时间】:2015-11-09 05:18:50
【问题描述】:
我刚刚开始使用线程, 我想写一个简单的文件压缩器。它应该创建两个后台线程——一个用于读取,另一个用于写入。第一个应该按小块读取文件并将它们放入队列中,其中int - 是chunkId。第二个线程应该将块出列并按顺序(使用chunkId)将它们写入输出流(文件,该线程在开始时创建的文件)。
我做到了。但我不明白为什么在我的程序结束并打开我的 gzip 文件后 - 我看到,我的块混合了,并且文件没有以前的顺序。
public static class Reader
{
private static readonly object Locker = new object();
private const int ChunkSize = 1024*1024;
private static readonly int MaxThreads;
private static readonly Queue<KeyValuePair<int, byte[]>> ChunksQueue;
private static int _chunksComplete;
static Reader()
{
MaxThreads = Environment.ProcessorCount;
ChunksQueue = new Queue<KeyValuePair<int,byte[]>>(MaxThreads);
}
public static void Read(string filename)
{
_chunksComplete = 0;
var tRead = new Thread(Reading) { IsBackground = true };
var tWrite = new Thread(Writing) { IsBackground = true };
tRead.Start(filename);
tWrite.Start(filename);
tRead.Join();
tWrite.Join();
Console.WriteLine("Finished");
}
private static void Writing(object threadContext)
{
var filename = (string) threadContext;
using (var s = File.Create(filename + ".gz"))
{
while (true)
{
var dataPair = DequeueSafe();
if (dataPair.Value == null)
return;
while (dataPair.Key != _chunksComplete)
{
Thread.Sleep(1);
}
Console.WriteLine("write chunk {0}", dataPair.Key);
using (var gz = new GZipStream(s, CompressionMode.Compress, true))
{
gz.Write(dataPair.Value, 0, dataPair.Value.Length);
}
_chunksComplete++;
}
}
}
private static void Reading(object threadContext)
{
var filename = (string) threadContext;
using (var s = File.OpenRead(filename))
{
var counter = 0;
var buffer = new byte[ChunkSize];
while (s.Read(buffer, 0, buffer.Length) != 0)
{
while (ChunksQueue.Count == MaxThreads)
{
Thread.Sleep(1);
}
Console.WriteLine("read chunk {0}", counter);
var dataPair = new KeyValuePair<int, byte[]>(counter, buffer);
EnqueueSafe(dataPair);
counter++;
}
EnqueueSafe(new KeyValuePair<int, byte[]>(0, null));
}
}
private static void EnqueueSafe(KeyValuePair<int, byte[]> dataPair)
{
lock (ChunksQueue)
{
ChunksQueue.Enqueue(dataPair);
}
}
private static KeyValuePair<int, byte[]> DequeueSafe()
{
while (true)
{
lock (ChunksQueue)
{
if (ChunksQueue.Count > 0)
{
return ChunksQueue.Dequeue();
}
}
Thread.Sleep(1);
}
}
}
更新: 我只能使用 .NET 3.5
【问题讨论】:
-
您应该切换到
BlockingCollection,并将ConcurrentQueue作为基础集合。您不再需要锁定,也不再需要Thread.Sleep(1),因为如果没有可用数据,它将等待数据到达。 -
我忘了说我只能使用 .net 3.5!
-
为什么不简单地读取一个缓冲区,然后在压缩+写入该缓冲区时读取下一个?只需要一个线程,您使用异步 I/O,如果 compress+write 线程比读取线程花费更多时间来完成其工作,则不要冒险用您已读取的缓冲区填充队列。
-
这只是一种练习。这个例子与现实生活中的例子没有任何联系。但是谢谢你的想法!
标签: c# multithreading .net-3.5