【问题标题】:Parallel.ForEach Source List with Where Condition带有 Where 条件的 Parallel.ForEach 源列表
【发布时间】:2016-09-20 13:41:56
【问题描述】:

我有一个代码块,它处理 StoreProducts 然后在每个循环中在数据库中添加或更新它们。但这很慢。当我转换代码 Parallel.ForEach 块时,相同的产品会同时添加和更新。我不知道如何安全地使用以下功能,我们将不胜感激。

var validProducts = storeProducts.Where(p => p.Price2 > 0
                                                     && !string.IsNullOrEmpty(p.ProductAtt08Desc.Trim())
                                                     && !string.IsNullOrEmpty(p.Barcode.Trim()) 
            ).ToList();

var processedProductCodes = new List<string>();

var po = new ParallelOptions()
        {
            MaxDegreeOfParallelism = 4
        };

Parallel.ForEach(validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode)), po,
            (product) =>
{
            lock (_lockThis)
            {
                processedProductCodes.Add(product.ProductCode);
            }

    // Check if Product Exists in Db

    // if product is not in Db Add to Db

    // if product is in Db Update product in Db

}

这里的事情是,validProducts 列表可能有多个相同的 ProductCode,因此它们是变体,我必须管理即使其中一个正在处理,也不应该再次处理。

因此,在并行 foreach 'validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode)' 中找到的条件无法正常工作。

【问题讨论】:

    标签: c# concurrency parallel-processing race-condition parallel.foreach


    【解决方案1】:

    我的大部分答案都不是对您的问题的回答,而是更多的一些指导 - 如果您要提供更多技术细节,我可能能够更准确地提供帮助。

    Parallel.ForEach 可能不是最好的解决方案——尤其是当您有一个共享列表或一个繁忙的服务器时。

    您正在锁定以写入但不能从该共享列表中读取。所以我很惊讶它在 Where 期间没有扔。将List&lt;string&gt; 转换为ConcurrentDictionary&lt;string, bool&gt;(只是为了创建一个简单的并发哈希表),然后您将获得更好的写入吞吐量,并且它不会在读取期间抛出。

    但您将遇到数据库争用问题(如果使用多个连接),因为您的插入可能仍需要锁定。即使您只是简单地拆分工作负载,您也会遇到这种情况。这种数据库锁定可能会导致阻塞/死锁,因此它最终可能比原来的要慢。如果使用一个连接,您通常无法并行化命令。

    我会尝试将大部分插入包装在一个 transaction 中,其中包含 batches,例如 1000 个插入,或者将整个工作负载放入一个批量插入中。然后数据库会将数据保存在内存中,并在完成后将整个数据提交到磁盘(而不是一次一条记录)。

    根据您的典型工作负载,您可能需要尝试不同的存储解决方案。数据库通常不适合插入大量记录...您可能会看到使用替代解决方案(例如键值存储)获得更好的性能。或者把数据放到Redis之类的东西里,在后台慢慢持久化到数据库中。

    【讨论】:

    • 感谢您的宝贵意见。因为我不是在修改源列表而是我正在比较的列表,所以没有发生集合修改错误。另一方面,我无法按照您的建议批量更新 db,因为我正在使用带有 efcache 模块的实体框架,所以外部 ef 事务是没有问题的。
    【解决方案2】:

    Parallel.ForEach 在内部为每个线程缓冲项目,您可以做的一个选择是切换到不使用缓冲的分区器

    var pat = Partitioner.Create(validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode))
                                ,EnumerablePartitionerOptions.NoBuffering);
    
    Parallel.ForEach(pat, po, (product) => ...
    

    这会让你更接近,但你仍然会有一个竞争条件,其中两个相同的对象可以被处理,因为如果你发现重复,你不会跳出循环。

    更好的选择是将processedProductCodes 切换为HashSet&lt;string&gt; 并将您的代码更改为

    var processedProductCodes = new HashSet<string>();
    
    var po = new ParallelOptions()
            {
                MaxDegreeOfParallelism = 4
            };
    
    Parallel.ForEach(validProducts, po,
                (product) =>
    {
                //You can safely lock on processedProductCodes
                lock (processedProductCodes)
                {
                    if(!processedProductCodes.Add(product.ProductCode))
                    {
                        //Add returns false if the code is already in the collection.
                        return;
                    }
                }
    
        // Check if Product Exists in Db
    
        // if product is not in Db Add to Db
    
        // if product is in Db Update product in Db
    
    }
    

    HashSet 具有更快的查找速度,并且内置于 Add 函数中。

    【讨论】:

    • 您可能还想查看执行“插入或更新”查询,而不是检查您的内存列表。在不直接支持这一点的数据库中,您通常可以执行更新、检查受影响记录的数量,并在单个查询中插入 if 0 all。但我会非常担心这个过程中的数据库阻塞/死锁(见我上面的回答),尤其是在其他工作负载正在进行的情况下。
    • @Scott Chamberlain 感谢您的时间和回答,我使用了分区器、哈希集和返回(正如您所指出的那样,我认为这成功了)一切都很顺利。
    猜你喜欢
    • 1970-01-01
    • 2015-04-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-06
    相关资源
    最近更新 更多