【问题标题】:Going from Parallel.ForEach to Multithreading从 Parallel.ForEach 到多线程
【发布时间】:2016-05-31 02:08:02
【问题描述】:

所以我将递归函数转换为迭代函数,然后使用 Parallel.ForEach,但是当我通过 VTune 运行它时,它的大部分运行时间实际上只使用了 2 个逻辑内核。

我决定尝试改用托管线程,并转换了这段代码:

for (int N = 2; N <= length; N <<= 1)
{
    int maxThreads = 4;
    var workGroup = Enumerable.Range(0, maxThreads);

    Parallel.ForEach(workGroup, i =>
    {
        for (int j = ((i / maxThreads) * length); j < (((i + 1) / maxThreads) * length); j += N)
        {
            for (int k = 0; k < N / 2; k++)
            {
                int evenIndex = j + k;
                int oddIndex = j + k + (N / 2);

                var even = output[evenIndex];
                var odd = output[oddIndex];

                output[evenIndex] = even + odd * twiddles[k * (length / N)];
                output[oddIndex] = even + odd * twiddles[(k + (N / 2)) * (length / N)];
            }
        }
    });
}

进入这个:

for (int N = 2; N <= length; N <<= 1)
{
    int maxThreads = 4;

    Thread one = new Thread(() => calculateChunk(0, maxThreads, length, N, output));
    Thread two = new Thread(() => calculateChunk(1, maxThreads, length, N, output));
    Thread three = new Thread(() => calculateChunk(2, maxThreads, length, N, output));
    Thread four = new Thread(() => calculateChunk(3, maxThreads, length, N, output));

    one.Start();
    two.Start();
    three.Start();
    four.Start();
}

public void calculateChunk(int i, int maxThreads, int length, int N, Complex[] output)
{
    for (int j = ((i / maxThreads) * length); j < (((i + 1) / maxThreads) * length); j += N)
    {
        for (int k = 0; k < N / 2; k++)
        {
            int evenIndex = j + k;
            int oddIndex = j + k + (N / 2);
            var even = output[evenIndex];
            var odd = output[oddIndex];

            output[evenIndex] = even + odd * twiddles[k * (length / N)];
            output[oddIndex] = even + odd * twiddles[(k + (N / 2)) * (length / N)];
        }
    }
}

问题出在 N 循环的最后一次迭代的第四个线程中,我得到一个输出数组的索引越界异常,其中索引尝试访问 length 的等效项。

我无法通过调试来查明原因,但我相信这与线程有关,我在没有线程的情况下运行代码并且它按预期工作。

如果有任何代码需要更改,请告诉我,我通常会有一些人建议修改。感谢您的帮助,我已尝试自己对其进行排序,并且相当确定问题出现在我的线程中,但我看不出是如何发生的。

PS:预期目的是并行化这段代码。

【问题讨论】:

  • Parallel.ForEach 有很多重载,可以通过选项控制并行度。
  • 您可以通过将 Parallel.ForEach 从原始 sn-p 的内循环移动到外循环来潜在地实现所需的结果(如果您希望它的元素比 workGroup 的更多课程)。这将减少Parallel.ForEach 设置和拆卸成本,并使负载均衡器能够更好地完成工作,我希望它可以扩展到 N 个内核。不过,如果您坚持使用线程,我希望在某处看到 Joins - 否则您会在前一批有机会完成之前在每次循环迭代中启动越来越多的线程。
  • 你确定你的算法是正确的吗?据我了解,在循环for (int j = ((i / maxThreads) * length); j &lt; (((i + 1) / maxThreads) * length); j += N) 中,初始值int j = ((i / maxThreads) * length 在[0,maxThreads-1] 范围内的i 将始终为0(这是整数除法!)。对于除最后一个值之外的所有i 值,循环条件`j false。所以最后,无论你使用多少线程,你的内循环只进入一次。
  • 没关系Thread。你刚刚跨过绊线,直接进入了雷区。 TPL 比您认为的要聪明得多——您不必手动将您交给Parallel.ForEach 的项目数量限制为您拥有的核心数量。事实上,它永远不会达到这个数字,因为您不允许线程池扩展到您的实际负载。
  • @qbik 你说得对,我在发布后不久就发现了这一点,我已经纠正了这个问题,但又出现了另一个问题,即拆分工作负载是导致索引超出范围的原因。跨度>

标签: c# .net multithreading parallel-processing


【解决方案1】:

观察到的行为几乎可以肯定是由于使用了捕获的循环迭代变量N。我可以通过一个简单的测试重现你的情况:

ConcurrentBag<int> numbers = new ConcurrentBag<int>();

for (int i = 0; i < 10000; i++)
{
    Thread t = new Thread(() => numbers.Add(i));

    t.Start();
    //t.Join(); // Uncomment this to get expected behaviour.
}

// You'd not expect this assert to be true, but most of the time it will be.
Assert.True(numbers.Contains(10000));

简而言之,您的for 循环正在竞相递增N,然后执行calculateChunk 调用的委托可以复制N 的值。结果calculateChunk 看到N 的几乎随机值上升到(包括)length &lt;&lt;= 1 - 这就是导致您的IndexOutOfRangeException 的原因。

您将获得的输出值也将是垃圾,因为您永远不能依赖 N 的值是正确的。

如果您想安全地重写原始代码以利用更多内核,请将Parallel.ForEach 从内循环移动到外循环。如果外部循环迭代的次数很高,负载均衡器将能够正常工作(在您当前的 workGroup 计数为 4 时它无法正常工作 - 元素数量太少了)。

【讨论】:

  • 只是为了强调安全。这段代码应该很容易放在 TPL (Parallel.ForEach) 上。跑到原始的Threads 是危险的 NIH。
  • N 的每个增量都使用上一次迭代的计算数据,所以我不能并行化它,它是一个数据依赖关系:(
  • @OliverGiess,我现在明白了。我还认为您过早地标记了我的答案。由于需要扫描以前的结果,这是一个有趣的并行化问题。我可以看到如何使用生产者-消费者方法和障碍来解决这个问题,但我不相信并行化带来的性能提升会超过同步开销。
  • 是的,我不介意我想我将采用使用线程池的方法,并在每个线程打开时将一系列 k 循环排队。我已经打开了另一个问题,该问题更具体地概述了我发现的问题,最初我认为这与我对线程的使用有关,后来我意识到这与封闭循环的逆性质有关。但我的另一个问题更尖锐地涵盖了这个话题。 stackoverflow.com/questions/37539345/…
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多