【问题标题】:How to multithread a process method如何多线程处理方法
【发布时间】:2013-06-03 21:54:16
【问题描述】:

我正在尝试使用多线程来更快地处理结果列表。我尝试为每个使用并行,但是当运行过程方法时,我没有收到正确的结果。

private IEnumerable<BulkProcessorResult> GetProccessResults(List<Foo> Foos)
{
    var listOfFooLists = CreateListOfFooLists(Foos);

    var bulkProcessorResults = new List<BulkProcessorResult>();
    Parallel.ForEach(listOfFooLists, FooList =>
    {
        foreach (var Foo in FooList)
        {
            var processClaimResult = _processor.Process(Foo);
            var bulkProcessorResult = new BulkProcessorResult()
            {
                ClaimStatusId = (int) processClaimResult.ClaimStatusEnum,
                Property1 = Foo.Property1
            };
            bulkProcessorResults.Add(bulkProcessorResult);
        }
    }); 

    return bulkProcessorResults;
}

如果我使用普通的 forEach,我会得到正确的输出。如果我使用上面的代码,当应该有三个状态为 1 和一个状态为 3 时,我会得到所有 2 的状态。

我对线程非常陌生,所以任何帮助都会很棒。

【问题讨论】:

  • 语言是什么?您应该将其添加到标签中。
  • 你可能想看看例如ConcurrentQueue&lt;T&gt; 作为中间存储。从多个线程操作List&lt;T&gt; 是不安全的(例如通过调用Add)。任何事情都可能(并且可能会)发生。
  • 我看到的第一个问题是您有多个并发线程将项目添加到buldProcessResults。这会导致问题,因为List.Add 并非设计用于处理多个并发更新。你需要用锁来保护它,或者使用某种类型的并发数据结构。
  • @Damien_The_Unbeliever 补充说,当我使用 ConcurrentQueue 时,它可以解决我的所有问题。谢谢。

标签: c# multithreading task-parallel-library


【解决方案1】:

最明显的问题是您正在使用多个线程(好吧,这通过调用 Parallel.ForEach 有点隐藏,但您应该知道它通过使用多个线程/任务来实现并行性)但是您正在使用List&lt;T&gt;,它不是线程安全的集合类:

只要不修改集合,List&lt;T&gt; 可以同时支持多个读取器。通过集合枚举本质上不是线程安全的过程。在枚举与一个或多个写访问竞争的极少数情况下,确保线程安全的唯一方法是在整个枚举期间锁定集合。要允许集合被多个线程访问进行读写,必须实现自己的同步

不过,与其实现您自己的同步,并且不会对代码中的其他内容进行太多改动,我会改用ConcurrentQueue&lt;T&gt;

private IEnumerable<BulkProcessorResult> GetProccessResults(List<Foo> Foos)
{
    var listOfFooLists = CreateListOfFooLists(Foos);

    var bulkProcessorResults = new ConcurrentQueue<BulkProcessorResult>();
    Parallel.ForEach(listOfFooLists, FooList =>
    {
        foreach (var Foo in FooList)
        {
            var processClaimResult = _processor.Process(Foo);
            var bulkProcessorResult = new BulkProcessorResult()
            {
                ClaimStatusId = (int) processClaimResult.ClaimStatusEnum,
                Property1 = Foo.Property1
            };
            bulkProcessorResults.Enqueue(bulkProcessorResult);
        }
    }); 

    return bulkProcessorResults;
}

【讨论】:

  • 你也可以使用ConcurrentBag&lt;T&gt;
  • @JimMischel 您是否有任何经验表明ConcurrentBag 在这种情况下会更有效率?据我所知,ConcurrentBag 在每个线程既是生产者和消费者时效果最好,但这里不是这种情况。
  • @svick:我没有关于ConcurrentBag 相对于ConcurrentQueue 的速度的信息。我只是提到了另一种选择。
【解决方案2】:

把整个事情当作一个 Parallel Linq 查询怎么样?

private IEnumerable<BulkProcessorResult> GetProccessResults(List<Foo> Foos)
{
  var listOfFooLists = CreateListOfFooLists(Foos);
  return listOfFooLists.AsParallel()
                       .SelectMany(FooList => FooList)
                       .Select(Foo =>
                            new BulProcessorResult {
                               ClaimStatusId = (int)_processor.Process(Foo),
                               Property1 = Foo.Property1
                            }).ToList();
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-01-02
    • 2021-10-27
    • 2017-04-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多