【问题标题】:How can I make this C# loop faster?如何使这个 C# 循环更快?
【发布时间】:2011-05-12 19:42:56
【问题描述】:

执行摘要:如果您想留在 C# 中,Reed 下面的回答是最快的。如果您愿意编组到 C++(我就是),这是一个更快的解决方案。

我在 C# 中有两个 55mb 的 ushort 数组。我正在使用以下循环将它们组合起来:

float b = (float)number / 100.0f;
for (int i = 0; i < length; i++)
{
      image.DataArray[i] = 
          (ushort)(mUIHandler.image1.DataArray[i] + 
          (ushort)(b * (float)mUIHandler.image2.DataArray[i]));
}

这段代码,根据前后添加 DateTime.Now 调用,运行时间为 3.5 秒。我怎样才能让它更快?

编辑:我认为这里有一些代码可以显示问题的根源。当下面的代码在一个全新的 WPF 应用程序中运行时,我得到了这些计时结果:

Time elapsed: 00:00:00.4749156 //arrays added directly
Time elapsed: 00:00:00.5907879 //arrays contained in another class
Time elapsed: 00:00:02.8856150 //arrays accessed via accessor methods

因此,当数组直接遍历时,时间比数组位于另一个对象或容器内要快得多。这段代码表明,不知何故,我正在使用访问器方法,而不是直接访问数组。即便如此,我似乎能达到的最快速度是半秒。当我使用 icc 在 C++ 中运行第二个代码清单时,我得到:

Run time for pointer walk: 0.0743338

在这种情况下,C++ 的速度提高了 7 倍(使用 icc,不确定是否可以使用 msvc 获得相同的性能——我对那里的优化不太熟悉)。有什么方法可以让 C# 接近 C++ 的性能水平,还是应该让 C# 调用我的 C++ 例程?

清单 1,C# 代码:

public class ArrayHolder
{
    int length;
    public ushort[] output;
    public ushort[] input1;
    public ushort[] input2;
    public ArrayHolder(int inLength)
    {
        length = inLength;
        output = new ushort[length];
        input1 = new ushort[length];
        input2 = new ushort[length];
    }

    public ushort[] getOutput() { return output; }
    public ushort[] getInput1() { return input1; }
    public ushort[] getInput2() { return input2; }
}


/// <summary>
/// Interaction logic for MainWindow.xaml
/// </summary>
public partial class MainWindow : Window
{
    public MainWindow()
    {
        InitializeComponent();


        Random random = new Random();

        int length = 55 * 1024 * 1024;
        ushort[] output = new ushort[length];
        ushort[] input1 = new ushort[length];
        ushort[] input2 = new ushort[length];

        ArrayHolder theArrayHolder = new ArrayHolder(length);

        for (int i = 0; i < length; i++)
        {
            output[i] = (ushort)random.Next(0, 16384);
            input1[i] = (ushort)random.Next(0, 16384);
            input2[i] = (ushort)random.Next(0, 16384);
            theArrayHolder.getOutput()[i] = output[i];
            theArrayHolder.getInput1()[i] = input1[i];
            theArrayHolder.getInput2()[i] = input2[i];
        }

        Stopwatch stopwatch = new Stopwatch(); 
        stopwatch.Start();
        int number = 44;
        float b = (float)number / 100.0f;
        for (int i = 0; i < length; i++)
        {
            output[i] =
                (ushort)(input1[i] +
                (ushort)(b * (float)input2[i]));
        } 
        stopwatch.Stop();

        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);
        stopwatch.Reset();

        stopwatch.Start();
        for (int i = 0; i < length; i++)
        {
            theArrayHolder.output[i] =
                (ushort)(theArrayHolder.input1[i] +
                (ushort)(b * (float)theArrayHolder.input2[i]));
        }
        stopwatch.Stop();

        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);
        stopwatch.Reset();

        stopwatch.Start();
        for (int i = 0; i < length; i++)
        {
            theArrayHolder.getOutput()[i] =
                (ushort)(theArrayHolder.getInput1()[i] +
                (ushort)(b * (float)theArrayHolder.getInput2()[i]));
        }
        stopwatch.Stop();

        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);
    }
}

清单 2,C++ 等效项: // looptiming.cpp : 定义控制台应用程序的入口点。 //

#include "stdafx.h"
#include <stdlib.h>
#include <windows.h>
#include <stdio.h>
#include <iostream>


int _tmain(int argc, _TCHAR* argv[])
{

    int length = 55*1024*1024;
    unsigned short* output = new unsigned short[length];
    unsigned short* input1 = new unsigned short[length];
    unsigned short* input2 = new unsigned short[length];
    unsigned short* outPtr = output;
    unsigned short* in1Ptr = input1;
    unsigned short* in2Ptr = input2;
    int i;
    const int max = 16384;
    for (i = 0; i < length; ++i, ++outPtr, ++in1Ptr, ++in2Ptr){
        *outPtr = rand()%max;
        *in1Ptr = rand()%max;
        *in2Ptr = rand()%max;
    }

    LARGE_INTEGER ticksPerSecond;
    LARGE_INTEGER tick1, tick2;   // A point in time
    LARGE_INTEGER time;   // For converting tick into real time


    QueryPerformanceCounter(&tick1);

    outPtr = output;
    in1Ptr = input1;
    in2Ptr = input2;
    int number = 44;
    float b = (float)number/100.0f;


    for (i = 0; i < length; ++i, ++outPtr, ++in1Ptr, ++in2Ptr){
        *outPtr = *in1Ptr + (unsigned short)((float)*in2Ptr * b);
    }
    QueryPerformanceCounter(&tick2);
    QueryPerformanceFrequency(&ticksPerSecond);

    time.QuadPart = tick2.QuadPart - tick1.QuadPart;

    std::cout << "Run time for pointer walk: " << (double)time.QuadPart/(double)ticksPerSecond.QuadPart << std::endl;

    return 0;
}

编辑 2: 在第二个示例中启用 /QxHost 将时间降低到 0.0662714 秒。按照@Reed 的建议修改第一个循环让我明白

经过的时间:00:00:00.3835017

所以,对于滑块来说,速度仍然不够快。那个时间是通过代码:

        stopwatch.Start();
        Parallel.ForEach(Partitioner.Create(0, length),
         (range) =>
         {
             for (int i = range.Item1; i < range.Item2; i++)
             {
                 output[i] =
                     (ushort)(input1[i] +
                     (ushort)(b * (float)input2[i]));
             }
         });

        stopwatch.Stop();

编辑 3 根据@Eric Lippert 的建议,我在发布时重新运行了 C# 中的代码,而不是使用附加的调试器,只需将结果打印到对话框。它们是:

  • 简单数组:~0.273s
  • 包含的数组:~0.330s
  • 存取器数组:~0.345s
  • 并行数组:~0.190s

(这些数字来自 5 次运行的平均值)

所以并行解决方案肯定比我之前得到的 3.5 秒快,但仍然比使用非 icc 处理器可达到的 0.074 秒略低。因此,最快的解决方案似乎是在 release 中编译,然后编组为 icc 编译的 C++ 可执行文件,这使得在此处使用滑块成为可能。

编辑 4:@Eric Lippert 的另外三个建议:将 for 循环的内部从 length 更改为 array.length,使用双精度,并尝试不安全的代码。

对于这三个,现在是时候了:

  • 长度:~0.274s
  • 双打,不是浮动:~0.290s
  • 不安全:~0.376s

到目前为止,并行解决方案是最大的赢家。虽然如果我可以通过着色器添加这些,也许我可以在那里看到某种加速......

这是附加代码:

        stopwatch.Reset();

        stopwatch.Start();

        double b2 = ((double)number) / 100.0;
        for (int i = 0; i < output.Length; ++i)
        {
            output[i] =
                (ushort)(input1[i] +
                (ushort)(b2 * (double)input2[i]));
        }

        stopwatch.Stop();
        DoubleArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
        stopwatch.Reset();

        stopwatch.Start();

        for (int i = 0; i < output.Length; ++i)
        {
            output[i] =
                (ushort)(input1[i] +
                (ushort)(b * input2[i]));
        }

        stopwatch.Stop();
        LengthArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);
        stopwatch.Reset();

        stopwatch.Start();
        unsafe
        {
            fixed (ushort* outPtr = output, in1Ptr = input1, in2Ptr = input2){
                ushort* outP = outPtr;
                ushort* in1P = in1Ptr;
                ushort* in2P = in2Ptr;
                for (int i = 0; i < output.Length; ++i, ++outP, ++in1P, ++in2P)
                {
                    *outP = (ushort)(*in1P + b * (float)*in2P);
                }
            }
        }

        stopwatch.Stop();
        UnsafeArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);

【问题讨论】:

  • 顺便说一句 - 如果您要测量运行方法所需的时间,最好使用 Stopwatch 类。它比 DateTime 准确得多。
  • @RichK-- 注意到。但在 3.5 秒时,我并不太关心毫秒精度。
  • 你能给我们举个例子吗?
  • 你只能这么快地处理大量数据。
  • 还有一些事情要尝试。首先,如果你用双精度数而不是浮点数来做数学呢?处理器针对双打进行了优化。除非您想节省空间,否则没有理由使用浮点数;它们节省空间,但通常不节省时间。其次,如果你把循环写成“for(int i = 0; i 抖动可以识别这种常见模式并优化循环。第三,知道使用原始指针是否会赢会很有趣。

标签: c# performance


【解决方案1】:

这应该是完全可并行化的。但是,鉴于每个元素所做的工作量很小,您需要格外小心地处理这个问题。

执行此操作的正确方法(在 .NET 4 中)是将Parallel.ForEach 与 Partitioner 结合使用:

float b = (float)number / 100.0f;
Parallel.ForEach(Partitioner.Create(0, length), 
(range) =>
{
   for (int i = range.Item1; i < range.Item2; i++)
   {
      image.DataArray[i] = 
          (ushort)(mUIHandler.image1.DataArray[i] + 
          (ushort)(b * (float)mUIHandler.image2.DataArray[i]));
   }
});

这将有效地将工作分配给系统中可用的处理核心,并且如果您有多个核心,应该会提供不错的加速。

话虽如此,这充其量只会通过系统中的内核数量来加速此操作。如果您需要加快速度,您可能需要恢复到并行化和不安全代码的混合。到那时,可能值得考虑尝试实时呈现这一点的替代方案。

【讨论】:

  • @Reed--感谢这个。如果我要尝试不安全的路线,我会从哪里开始?如果我这样做了,是否有可能混合使用不安全代码和这种自动并行化?
  • @mmr:是的。您可以通过使用指针操作来更快地完成此操作。您将使用与上述相同的技术,但随后将数组转换为指针,并直接进行操作。这是对 for 循环的一个小改进(通常),但考虑到数量,它可能有助于加快速度。不过,我会对其进行分析,并且只有在确实有帮助时才这样做......
  • @Reed——这样做可以节省一秒钟的时间。这让我有点吃惊,因为 C++ 可以在几毫秒内处理这个问题。
  • @mmr:如果你有更快的 C++ 代码,它不会做同样的事情。 C# 和 C++ 之间的性能差异并不大。我使用不安全模式和指针访问数据进行了测试,完全没有性能差异。
  • @Guffa:这也是我的经验。切换到不安全,C# 经常会在这样的操作中胜过 C++(前提是你没有利用 SSE 等)
【解决方案2】:

假设您有很多这样的人,您可以尝试并行化操作(并且您使用的是 .NET 4):

Parallel.For(0, length, i=>
   {
       image.DataArray[i] = 
      (ushort)(mUIHandler.image1.DataArray[i] + 
      (ushort)(b * (float)mUIHandler.image2.DataArray[i]));
   });

当然,这一切都将取决于并行化是否值得。该语句在计算上看起来相当短;按数字访问索引的速度非常快。您可能会有所收获,因为这个循环使用这么多数据运行了很多次。

【讨论】:

  • 这在实践中不会很有效,因为循环体中的工作量很小。请参阅我的答案以获得更有效的替代方案。
猜你喜欢
  • 1970-01-01
  • 2019-07-25
  • 1970-01-01
  • 2018-12-25
  • 2021-12-20
  • 1970-01-01
  • 2011-02-09
  • 2020-06-07
  • 1970-01-01
相关资源
最近更新 更多