【问题标题】:IEnumerable<T>, Parallel.ForEach and Memory ManagementIEnumerable<T>、Parallel.ForEach 和内存管理
【发布时间】:2014-04-11 23:11:26
【问题描述】:

我正在读取和处理大量的 Sql Server 数据(输入数以百万计的行,输出数以百万计的行)。对每个源行执行的处理很重要。单线程版本没有达到预期的效果。我当前的并行处理版本在一些较小的批次(300,000 个源行,1M 输出行)上表现非常好,但是对于非常大的运行,我遇到了一些 Out Of Memory 异常。

代码受到此处提供的答案的极大启发: Is there a way to use the Task Parallel Library(TPL) with SQLDataReader?

大致思路如下:

获取源数据(数据太大而无法读入内存,所以我们将“流式传输”它)

public static IEnumerable<MyObject> ReadData()
{
    using (SqlConnection con = new SqlConnection(Settings.ConnectionString)) 
       using (SqlCommand cmd = new SqlCommand(selectionSql, con))
       {
            con.Open();
            using (SqlDataReader dr = cmd.ExecuteReader(CommandBehavior.CloseConnection))
            {
            while (dr.Read())
            {
                // make some decisions here – 1 to n source rows are used
                // to create an instance of MyObject
                yield return new MyObject(some parameters);
            }
        }
    }
}

一旦我们到达并行处理的地步,我们想要使用 SqlBulkCopy 对象来写入数据。正因为如此,我们不想并行处理单个 MyObject,因为我们想为每个线程执行批量复制。因此,我们将从上面读取另一个 IEnumerable,它返回一个“批次”的 MyObjects

class MyObjectBatch 
{
    public List<MyObject> Items { get; set; }

    public MyObjectBatch (List<MyObject> items)
    {
        this.Items = items;
    }

    public static IEnumerable<MyObjectBatch> Read(int batchSize)
    {
        List<MyObject> items = new List<MyObjectBatch>();
        foreach (MyObject o in DataAccessLayer.ReadData())
        {
            items.Add(o);
            if (items.Count >= batchSize)
            {
                yield return new MyObjectBatch(items);                    
                items = new List<MyObject>(); // reset
            }
        }
        if (items.Count > 0) yield return new MyObjectBatch(items);            
    }
}

最后,我们开始并行处理“批次”

ObjectProcessor processor = new ObjectProcessor();

ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = Settings.MaxThreads };
Parallel.ForEach(MyObjectBatch.Read(Settings.BatchSize), options, batch =>
{
    // Create a container for data processed by this thread
    // the container implements IDataReader
    ProcessedData targetData = new ProcessedData(some params));

    // process the batch… for each MyObject in MyObjectBatch – 
    // results are collected in targetData
    for (int index = 0; index < batch.Items.Count; index++) 
    {
        processor.Process(batch.Item[index], targetData);
    }

    // bulk copy the data – this creates a SqlBulkCopy instance
    // and loads the data to the target table
    DataAccessLayer.BulkCopyData(targetData);

    // explicitly set the batch and targetData to null to try to free resources

});

以上所有内容都已大大简化,但我相信它包含了所有重要概念。这是我看到的行为:

性能非常好——对于合理大小的数据集,我得到了非常好的结果。

但是,在处理过程中,消耗的内存会继续增长。对于较大的数据集,这会导致异常。

我已经通过日志记录证明,如果我减慢从数据库读取的速度,它会减慢批量读取的速度,进而减慢创建的并行线程(特别是如果我设置 MaxDegreeOfParallelization)。我担心我的阅读速度超过了我的处理速度,但如果我限制线程,它应该只读取每个线程可以处理的内容。

较小或较大的批处理大小对性能有一些影响,但使用的内存量会随着批处理大小的增加而持续增长。

这里哪里有机会恢复一些记忆?当我的“批次”超出范围时,是否应该恢复该内存?我可以在前两层做些什么来帮助释放一些资源吗?

回答一些问题: 1. 是否可以纯粹在 SQL 中完成 - 不,处理逻辑非常复杂(并且是动态的)。一般来说,它是在做低级的二进制解码。 2.我们已经尝试过SSIS(取得了一些成功)。问题在于源数据的定义以及输出是非常动态的。 SSIS 似乎需要非常严格的输入和输出列定义,这在这种情况下不起作用。

有人还询问了 ProcessedData 对象 - 这实际上相当简单:

class ProcessedData : IDataReader 
{
    private int _currentIndex = -1;
    private string[] _fieldNames { get; set; }

    public string TechnicalTableName { get; set; }        
    public List<object[]> Values { get; set; }

    public ProcessedData(string schemaName, string tableName, string[] fieldNames)
    {            
        this.TechnicalTableName = "[" + schemaName + "].[" + tableName + "]";
        _fieldNames = fieldNames;            
        this.Values = new List<object[]>();
    }

    #region IDataReader Implementation

    public int FieldCount
    {
        get { return _fieldNames.Length; }
    }

    public string GetName(int i)
    {
        return _fieldNames[i];
    }

    public int GetOrdinal(string name)
    {
        int index = -1;
        for (int i = 0; i < _fieldNames.Length; i++)
        {
            if (_fieldNames[i] == name)
            {
                index = i;
                break;
            }
        }
        return index;
    }

    public object GetValue(int i)
    {
        if (i > (Values[_currentIndex].Length- 1))
        {
            return null;
        }
        else
        {
            return Values[_currentIndex][i];
        }
    }

    public bool Read()
    {
        if ((_currentIndex + 1) < Values.Count)
        {
            _currentIndex++;
            return true;
        }
        else
        {
            return false;
        }
    }

    // Other IDataReader things not used by SqlBulkCopy not implemented
}

更新和结论:

我收到了大量有价值的意见,但想将其总结为一个结论。首先,我的主要问题是我是否可以做任何其他事情(使用我发布的代码)来积极回收内存。共识似乎是该方法是正确的,但我的特定问题并不完全受 CPU 限制,因此简单的 Parallel.ForEach 将无法正确管理处理。

感谢 usr 的调试建议和非常有趣的 PLINQ 建议。感谢 zmbq 帮助澄清发生了什么和没有发生什么。

最后,其他任何可能正在关注类似问题的人都可能会发现以下讨论很有帮助:

Parallel.ForEach can cause a "Out Of Memory" exception if working with a enumerable with a large object

Parallel Operation Batching

【问题讨论】:

  • 正在进行的计算是什么?我问的原因是,您可能有一种方法可以完全在 SQL 中完成,这将解决您的内存问题,因为 SQL 会优化您的代码。我知道这不是问题所要问的。
  • 除了 Michael 提到的之外,另一种选择是使用 Sql Server Integration Services,它真的是为做这类事情而设计的。如果需要,您可以在应用程序中启动 SSIS 作业。
  • 几件事,您需要点播数据吗?如果不考虑 SSAS。另一件事,如果你想做这样的事情,就去低层次。另一件事,忽略内置的扩展方法和库。没有人可以做一些泛化并完美地满足极不可能的需求。
  • 这里最大的问题是它不是真正的代码。很可能,您忽略了问题的关键所在。例如,由于我们不知道 ProcessedData 中返回的数据在传递给处理器后会发生什么,因此我们不知道它是否保留了对数据的引用,从而导致您的问题。
  • @ErikTheViking - 感谢您的评论(并理解您的观点)。但是,显示所有“真实”代码是不合理的。需要澄清的一点是 ObjectProcessor.Process() 将所有内容都保留在本地,因此,随着对象的处理完成,其内部的所有内容都应该超出范围。那么,在您看来,您所看到的应该是自行清理?

标签: c# sql-server memory-management task-parallel-library


【解决方案1】:

我不完全理解Parallel.ForEach 是如何拉取项目的,但我认为默认情况下它拉取多个项目以节省锁定开销。这意味着多个项目可能在Parallel.ForEach 内部排队。这可能会很快导致 OOM,因为您的项目单独很大。

您可以尝试给它一个Partitioner that returns single items

如果这没有帮助,我们需要深入挖掘。使用 Parallel 和 PLINQ 调试内存问题很麻烦。其中一个存在错误,例如,导致旧项目无法快速发布。

作为一种解决方法,您可以在处理后清除列表。这将至少允许在处理完成后确定性地回收所有项目。

关于您发布的代码:它干净、高质量,并且您遵守高标准的资源管理。我不会怀疑您有严重的内存或资源泄漏。这仍然不是不可能的。您可以通过注释掉Parallel.ForEach 中的代码并将其替换为Thread.Sleep(1000 * 60) 来测试这一点。如果泄漏仍然存在,您没有错。

根据我的经验,PLINQ 更容易获得精确程度的并行性(因为当前版本使用您指定的确切 DOP,永远不会少)。像这样:

GetRows()
.AsBatches(10000)    
.AsParallel().WithDegreeOfParallelism(8)
.Select(TransformItems) //generate rows to write
.AsEnumerable() //leave PLINQ
.SelectMany(x => x) //flatten batches
.AsBatches(1000000) //create new batches with different size
.AsParallel().WithDegreeOfParallelism(2) //PLINQ with different DOP
.ForEach(WriteBatchToDB); //write to DB

这将为您提供一个简单的管道,该管道从数据库中提取数据,使用针对 CPU 优化的特定 DOP 执行受 CPU 限制的工作,并以更大的批次和更少的 DOP 写入数据库。

这很简单,它应该使用各自的 DOP 独立地最大化 CPU 和磁盘。玩弄 DOP 号码。

【讨论】:

  • 虽然我对 Thread.Sleep(1000 * 60) 没有耐心,但我做了你提到的 15 秒(这是给定线程的一个很好的近似值)。使用的内存确实随着运行而增长,但似乎更合理。我不确定我学到了太多东西,但这是一个很好的提示,可以帮助我进行更深入的调试 - 谢谢。
  • 我刚刚添加了一些可能对您也有帮助的内容。您说 5k 的批量复制大小最适合您。这是不太可能的。您可能应该有独立的 DOP 用于 CPU 工作和批量复制工作。然后,您应该会看到非常大的批次(例如 100k)的吞吐量更好。
  • PLINQ 示例看起来很有趣。当然,我并没有完全遵循它。我假设 GetRows() 是我的 IEnumerable 函数。但是,我似乎没有选择 .AsBatches(x) - 我可以从 .AsParallel() 开始,做一些你有的事情。我的主要问题是如何设计 TransFormItems 和 WriteBatchToDB 方法......以及,它们如何共享数据?
  • GetRows 是您的 ReadData。 AsBatches 是您的自定义批处理逻辑。 TransformItems 是您在插入之前可能想要进行的一些计算。 WriteBatchToDB 是批量复制的东西。这些只是占位符,你可以随意填写。
【解决方案2】:

您在内存中保留了两件事 - 您的输入数据和输出数据。您已尝试并行读取和处理该数据,但您并没有减少总体内存占用 - 您最终仍将大部分数据保留在内存中 - 您拥有的线程越多,您保留在内存中的数据就越多。

我猜大部分内存都被您的输出数据占用了,因为您创建的输出记录是输入记录的 10 倍。所以你有几个(10?30?50)SqlBulkCopy 操作。

这实在是太多了。通过批量写入 100,000 条记录,您可以获得很多的速度。你应该做的是拆分你的工作——读取 10,000-20,000 条记录,创建输出记录,SqlBulkCopy 到数据库,然后重复。您的内存消耗将大大减少。

当然,您可以并行处理 - 并行处理数个 10,000 个记录批次。

请记住,Parallel.ForEach 和线程池通常旨在优化 CPU 使用率。限制您的可能是数据库服务器上的 I/O。虽然数据库可以很好地处理并发,但它们的限制并不取决于您客户端计算机上的内核数量,因此您最好使用并发线程的数量,看看什么是最快的。

【讨论】:

  • MyObjectBatch 对象正在执行您提到的操作。我尝试了不同大小的批次和线程数来尝试找到最佳位置。似乎效果最好的是大约 5000 个的批量副本(任何更大的副本似乎都会拖累它)。所以,一个大问题,你有没有一种方法可以在我完成时释放输入和/或输出数据?我希望它会超出范围并得到清理,但这似乎没有发生。
  • 垃圾收集器应该为你处理好这些。它只会在需要的时候工作,所以你会看到内存消耗不断上升,然后突然下降。您不应该看到 OutOfMemoryExceptions。即使一次处理 5000 条记录,您是否也看到了这些?
  • 我确实看到偶尔会减少内存消耗,但总体而言它会继续增加。因此,对于较小的批次,它会完成并清理(在 OOM 之前)。但是,即使在限制线程或批量大小时,大批量总是会 OOM。
猜你喜欢
  • 2011-04-16
  • 1970-01-01
  • 1970-01-01
  • 2012-05-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-10-12
  • 1970-01-01
相关资源
最近更新 更多