【问题标题】:Parallel.ForEach while retaining orderParallel.ForEach 同时保持顺序
【发布时间】:2012-06-22 07:45:58
【问题描述】:

我有一个 List<byte[]>,我喜欢将每个 byte[] 反序列化为 Foo。 List 是有序的,我喜欢编写一个并行循环,其中生成的List<Foo> 包含所有 Foo,其顺序与原始byte[] 相同。该列表非常大,以使并行操作值得。有没有内置的方法来实现这一点?

如果没有,有什么想法可以通过同步运行来实现加速?

谢谢

【问题讨论】:

  • 请发布一些示例代码,并且为了记录,使并行化的东西不一定会更快更高效...想知道 Parallel.For 是否会解决您的问题。
  • 这是一个副本:stackoverflow.com/questions/3639768/… 所以要回答,您可以使用 PLINQ (AsOrdered, AsParallel) 来完成工作。
  • 最好有SelectMap的并行等价物来保留输入顺序。
  • @adt:您给出了答案:Parallel.For 解决了问题。将此作为答案发布。我会投票 +1。
  • @adt,如果我有代码,我就不需要问了 ;-) 如果这对你有帮助,我可以添加一个简单的 for 循环?

标签: c# collections asynchronous concurrency parallel-processing


【解决方案1】:

根据您提供的信息,我知道您想要一个 Foo 的输出数组,其大小等于输入的字节数组?它是否正确?

如果是这样,是的,操作很简单。不要为锁定或同步结构而烦恼,这些会侵蚀并行化为您提供的所有速度。

相反,如果你遵守这个简单的规则,任何算法都可以在没有锁定或同步的情况下并行化:

对于每个处理的输入元素 X[i],您可以从任何输入元素 X[j] 中读取,但只能写入输出元素 Y[i]

查找 Scatter/Gather,这种类型的操作称为聚集,因为只有一个输出元素被写入。

如果您可以使用上述原则,那么您希望预先创建输出数组 Foo[],并在输入数组上使用 Parallel.For 而不是 ForEach。

例如

        List<byte[]> inputArray = new List<byte[]>();
        int[] outputArray = new int[inputArray.Count];

        var waitHandle = new ManualResetEvent(false);
        int counter = 0;

        Parallel.For(0, inputArray.Count, index =>
            {
                // Pass index to for loop, do long running operation 
                // on input items
                // writing to only a single output item
                outputArray[index] = DoOperation(inputArray[index]);

                if(Interlocked.Increment(ref counter) == inputArray.Count -1)
                {
                    waitHandle.Set();
                }
            });

        waitHandler.WaitOne();

        // Optional conversion back to list if you wanted this
        var outputList = outputArray.ToList();

【讨论】:

  • 将您的答案标记为所需的解决方案,尽管最后我通过 svick 的帮助使用了自定义 IPropagatorBlock 和 TPLDataflow。尽管从表面上看,这个问题和数据流似乎是两个非常不同的概念,但就我的具体问题而言,IPropagatorBlock 中数据块的并行处理大大优于任何 Parallel.For 循环。我发现映射和完成通知的同步开销非常昂贵,并且 TPL 数据流针对的是我的场景。但这绝不会影响您对我的具体问题的解决方案......
  • ...不包括同步和完成通知的问题。这是关于 Parallel.Foreach 在保留订单的同时,您提出的解决方案符合要求。非常感谢。
  • 好点!任务并行库是一个优越的解决方案。我很惊讶它的表现优于它。也许它限制了每个线程执行多个序列化的任务?对于上述情况,您可以包含带有整数计数器和Interlocked.Increment(ref counter) 的完整通知,以测试是否所有元素都已被处理。然后设置等待句柄以继续。
【解决方案2】:

您可以使用带有索引 int 键的线程安全字典来存储来自 foo 的结果 所以最后你将拥有字典中的所有数据排序器

【讨论】:

  • 谢谢,我知道,但这听起来非常复杂。我基本上需要另一个完整的过程来按顺序等待下一个传入的 Foo,检查同步性,并以正确的顺序将项目添加到结果列表中。看起来相当多的开销可能会破坏并行运行的整个概念。
猜你喜欢
  • 1970-01-01
  • 2021-11-22
  • 2020-11-01
  • 1970-01-01
  • 2012-04-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多