【发布时间】:2009-11-19 22:39:24
【问题描述】:
我有一个小示例应用程序,我正在尝试让一些新的 .Net 4.0 Parallel Extensions 运行(它们非常好)。我遇到了 OutOfMemoryException 的(可能非常愚蠢)问题。我希望将此示例插入的主应用程序读取一些数据和大量文件,对它们进行一些处理,然后将它们写出某个地方。我遇到了一些文件变大(可能是 GB)的问题,并且担心内存问题,所以我想并行化导致我走上这条路的事情。
现在下面的代码在较小的文件上获得了 OOME,我想我只是遗漏了一些东西。它将读取 10-15 个文件并很好地并行写入它们,但随后它会在下一个文件中窒息。看起来它的读写大约有 650MB。第二双眼睛将不胜感激。
我正在从 FileStream 读取 MemorySteam,因为这是主应用程序所需要的,我只是想在某种程度上复制它。它从所有类型的地方读取数据和文件,并将其作为 MemoryStreams 处理。
这是使用 .Net 4.0 Beta 2,VS 2010。
namespace ParellelJob
{
class Program
{
BlockingCollection<FileHolder> serviceToSolutionShare;
static void Main(string[] args)
{
Program p = new Program();
p.serviceToSolutionShare = new BlockingCollection<FileHolder>();
ServiceStage svc = new ServiceStage(ref p.serviceToSolutionShare);
SolutionStage sol = new SolutionStage(ref p.serviceToSolutionShare);
var svcTask = Task.Factory.StartNew(() => svc.Execute());
var solTask = Task.Factory.StartNew(() => sol.Execute());
while (!solTask.IsCompleted)
{
}
}
}
class ServiceStage
{
BlockingCollection<FileHolder> outputCollection;
public ServiceStage(ref BlockingCollection<FileHolder> output)
{
outputCollection = output;
}
public void Execute()
{
var di = new DirectoryInfo(@"C:\temp\testfiles");
var files = di.GetFiles();
foreach (FileInfo fi in files)
{
using (var fs = new FileStream(fi.FullName, FileMode.Open, FileAccess.Read))
{
int b;
var ms = new MemoryStream();
while ((b = fs.ReadByte()) != -1)
{
ms.WriteByte((byte)b); //OutOfMemoryException Occurs Here
}
var f = new FileHolder();
f.filename = fi.Name;
f.contents = ms;
outputCollection.TryAdd(f);
}
}
outputCollection.CompleteAdding();
}
}
class SolutionStage
{
BlockingCollection<FileHolder> inputCollection;
public SolutionStage(ref BlockingCollection<FileHolder> input)
{
inputCollection = input;
}
public void Execute()
{
FileHolder current;
while (!inputCollection.IsCompleted)
{
if (inputCollection.TryTake(out current))
{
using (var fs = new FileStream(String.Format(@"c:\temp\parellel\{0}", current.filename), FileMode.OpenOrCreate, FileAccess.Write))
{
using (MemoryStream ms = (MemoryStream)current.contents)
{
ms.WriteTo(fs);
current.contents.Close();
}
}
}
}
}
}
class FileHolder
{
public string filename { get; set; }
public Stream contents { get; set; }
}
}
【问题讨论】:
标签: c# stream parallel-processing