【发布时间】:2014-04-11 23:11:26
【问题描述】:
我正在读取和处理大量的 Sql Server 数据(输入数以百万计的行,输出数以百万计的行)。对每个源行执行的处理很重要。单线程版本没有达到预期的效果。我当前的并行处理版本在一些较小的批次(300,000 个源行,1M 输出行)上表现非常好,但是对于非常大的运行,我遇到了一些 Out Of Memory 异常。
代码受到此处提供的答案的极大启发: Is there a way to use the Task Parallel Library(TPL) with SQLDataReader?
大致思路如下:
获取源数据(数据太大而无法读入内存,所以我们将“流式传输”它)
public static IEnumerable<MyObject> ReadData()
{
using (SqlConnection con = new SqlConnection(Settings.ConnectionString))
using (SqlCommand cmd = new SqlCommand(selectionSql, con))
{
con.Open();
using (SqlDataReader dr = cmd.ExecuteReader(CommandBehavior.CloseConnection))
{
while (dr.Read())
{
// make some decisions here – 1 to n source rows are used
// to create an instance of MyObject
yield return new MyObject(some parameters);
}
}
}
}
一旦我们到达并行处理的地步,我们想要使用 SqlBulkCopy 对象来写入数据。正因为如此,我们不想并行处理单个 MyObject,因为我们想为每个线程执行批量复制。因此,我们将从上面读取另一个 IEnumerable,它返回一个“批次”的 MyObjects
class MyObjectBatch
{
public List<MyObject> Items { get; set; }
public MyObjectBatch (List<MyObject> items)
{
this.Items = items;
}
public static IEnumerable<MyObjectBatch> Read(int batchSize)
{
List<MyObject> items = new List<MyObjectBatch>();
foreach (MyObject o in DataAccessLayer.ReadData())
{
items.Add(o);
if (items.Count >= batchSize)
{
yield return new MyObjectBatch(items);
items = new List<MyObject>(); // reset
}
}
if (items.Count > 0) yield return new MyObjectBatch(items);
}
}
最后,我们开始并行处理“批次”
ObjectProcessor processor = new ObjectProcessor();
ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = Settings.MaxThreads };
Parallel.ForEach(MyObjectBatch.Read(Settings.BatchSize), options, batch =>
{
// Create a container for data processed by this thread
// the container implements IDataReader
ProcessedData targetData = new ProcessedData(some params));
// process the batch… for each MyObject in MyObjectBatch –
// results are collected in targetData
for (int index = 0; index < batch.Items.Count; index++)
{
processor.Process(batch.Item[index], targetData);
}
// bulk copy the data – this creates a SqlBulkCopy instance
// and loads the data to the target table
DataAccessLayer.BulkCopyData(targetData);
// explicitly set the batch and targetData to null to try to free resources
});
以上所有内容都已大大简化,但我相信它包含了所有重要概念。这是我看到的行为:
性能非常好——对于合理大小的数据集,我得到了非常好的结果。
但是,在处理过程中,消耗的内存会继续增长。对于较大的数据集,这会导致异常。
我已经通过日志记录证明,如果我减慢从数据库读取的速度,它会减慢批量读取的速度,进而减慢创建的并行线程(特别是如果我设置 MaxDegreeOfParallelization)。我担心我的阅读速度超过了我的处理速度,但如果我限制线程,它应该只读取每个线程可以处理的内容。
较小或较大的批处理大小对性能有一些影响,但使用的内存量会随着批处理大小的增加而持续增长。
这里哪里有机会恢复一些记忆?当我的“批次”超出范围时,是否应该恢复该内存?我可以在前两层做些什么来帮助释放一些资源吗?
回答一些问题: 1. 是否可以纯粹在 SQL 中完成 - 不,处理逻辑非常复杂(并且是动态的)。一般来说,它是在做低级的二进制解码。 2.我们已经尝试过SSIS(取得了一些成功)。问题在于源数据的定义以及输出是非常动态的。 SSIS 似乎需要非常严格的输入和输出列定义,这在这种情况下不起作用。
有人还询问了 ProcessedData 对象 - 这实际上相当简单:
class ProcessedData : IDataReader
{
private int _currentIndex = -1;
private string[] _fieldNames { get; set; }
public string TechnicalTableName { get; set; }
public List<object[]> Values { get; set; }
public ProcessedData(string schemaName, string tableName, string[] fieldNames)
{
this.TechnicalTableName = "[" + schemaName + "].[" + tableName + "]";
_fieldNames = fieldNames;
this.Values = new List<object[]>();
}
#region IDataReader Implementation
public int FieldCount
{
get { return _fieldNames.Length; }
}
public string GetName(int i)
{
return _fieldNames[i];
}
public int GetOrdinal(string name)
{
int index = -1;
for (int i = 0; i < _fieldNames.Length; i++)
{
if (_fieldNames[i] == name)
{
index = i;
break;
}
}
return index;
}
public object GetValue(int i)
{
if (i > (Values[_currentIndex].Length- 1))
{
return null;
}
else
{
return Values[_currentIndex][i];
}
}
public bool Read()
{
if ((_currentIndex + 1) < Values.Count)
{
_currentIndex++;
return true;
}
else
{
return false;
}
}
// Other IDataReader things not used by SqlBulkCopy not implemented
}
更新和结论:
我收到了大量有价值的意见,但想将其总结为一个结论。首先,我的主要问题是我是否可以做任何其他事情(使用我发布的代码)来积极回收内存。共识似乎是该方法是正确的,但我的特定问题并不完全受 CPU 限制,因此简单的 Parallel.ForEach 将无法正确管理处理。
感谢 usr 的调试建议和非常有趣的 PLINQ 建议。感谢 zmbq 帮助澄清发生了什么和没有发生什么。
最后,其他任何可能正在关注类似问题的人都可能会发现以下讨论很有帮助:
【问题讨论】:
-
正在进行的计算是什么?我问的原因是,您可能有一种方法可以完全在 SQL 中完成,这将解决您的内存问题,因为 SQL 会优化您的代码。我知道这不是问题所要问的。
-
除了 Michael 提到的之外,另一种选择是使用 Sql Server Integration Services,它真的是为做这类事情而设计的。如果需要,您可以在应用程序中启动 SSIS 作业。
-
几件事,您需要点播数据吗?如果不考虑 SSAS。另一件事,如果你想做这样的事情,就去低层次。另一件事,忽略内置的扩展方法和库。没有人可以做一些泛化并完美地满足极不可能的需求。
-
这里最大的问题是它不是真正的代码。很可能,您忽略了问题的关键所在。例如,由于我们不知道 ProcessedData 中返回的数据在传递给处理器后会发生什么,因此我们不知道它是否保留了对数据的引用,从而导致您的问题。
-
@ErikTheViking - 感谢您的评论(并理解您的观点)。但是,显示所有“真实”代码是不合理的。需要澄清的一点是 ObjectProcessor.Process() 将所有内容都保留在本地,因此,随着对象的处理完成,其内部的所有内容都应该超出范围。那么,在您看来,您所看到的应该是自行清理?
标签: c# sql-server memory-management task-parallel-library