【问题标题】:How to quickly subtract one ushort array from another in C#?如何在 C# 中快速从另一个数组中减去一个 ushort 数组?
【发布时间】:2014-01-16 00:55:16
【问题描述】:

我需要快速从 ushort arrayB 中具有相同长度的相应索引值中减去 ushort arrayA 中的每个值。

另外,如果差为负,我需要存储一个零,而不是负差。

(确切地说,长度 = 327680,因为我要从另一个相同大小的图像中减去 640x512 的图像)。

下面的代码目前需要约 20 毫秒,如果可能的话,我希望将其降低到约 5 毫秒。不安全的代码是可以的,但是请提供一个例子,因为我在编写不安全的代码方面并不是很熟练。

谢谢!

public ushort[] Buffer { get; set; }

public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
    System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
    sw.Start();

    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        int difference = Buffer[index] - backgroundBuffer[index];

        if (difference >= 0)
            Buffer[index] = (ushort)difference;
        else
            Buffer[index] = 0;
    }

    Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
}

更新:虽然它不是严格意义上的 C#,但为了其他阅读本文的人的利益,我最终使用以下代码将 C++ CLR 类库添加到我的解决方案中。它的运行时间约为 3.1 毫秒。如果使用非托管 C++ 库,它会在 ~2.2ms 内运行。由于时差很小,我决定使用托管库。

// SpeedCode.h
#pragma once
using namespace System;

namespace SpeedCode
{
    public ref class SpeedClass
    {
        public:
            static void SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength);
    };
}

// SpeedCode.cpp
// This is the main DLL file.
#include "stdafx.h"
#include "SpeedCode.h"

namespace SpeedCode
{
    void SpeedClass::SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength)
    {
        for (int index = 0; index < bufferLength; index++)
        {
            buffer[index] = (UInt16)((buffer[index] - backgroundBuffer[index]) * (buffer[index] > backgroundBuffer[index]));
        }
    }
}

那我这样称呼它:

    public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
    {
        System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
        sw.Start();

        SpeedCode.SpeedClass.SpeedSubtractBackgroundFromBuffer(Buffer, backgroundBuffer, Buffer.Length);

        Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
    }

【问题讨论】:

  • ~20ms 听起来相当慢(也许你的机器规格低?)。 以防万一,您是否在没有调试的情况下运行发布版本?
  • p/invoke 并使用 PSUBW?
  • 不感兴趣,但您是在灰度图像上操作吗?
  • @GeoffBattye:我的机器是 Win7 64 位 i5。我正在运行调试版本。
  • @nick_w:是的,我正在处理灰度图像。

标签: c# arrays performance subtraction ushort


【解决方案1】:

一些基准测试。

  1. SubtractBackgroundFromBuffer: 这是问题中的原始方法。
  2. SubtractBackgroundFromBufferWithCalcOpt:这是原始方法增加了TTat提高计算速度的想法。
  3. SubtractBackgroundFromBufferParallelFor: Selman22 回答中的解决方案。
  4. SubtractBackgroundFromBufferBlockParallelFor:我的回答。与 3. 类似,但将处理分解为 4096 个值的块。
  5. SubtractBackgroundFromBufferPartitionedParallelForEach:Geoff 的第一个答案。
  6. SubtractBackgroundFromBufferPartitionedParallelForEachHack:Geoff 的第二个回答。

更新

有趣的是,我可以通过使用 SubtractBackgroundFromBufferBlockParallelFor 获得小幅度的速度提升 (~6%)(如 Bruno Costa 建议的那样)

Buffer[i] = (ushort)Math.Max(difference, 0);

而不是

if (difference >= 0)
    Buffer[i] = (ushort)difference;
else
    Buffer[i] = 0;

结果

请注意,这是每次运行 1000 次迭代的总时间。

SubtractBackgroundFromBuffer(ms):                                 2,062.23 
SubtractBackgroundFromBufferWithCalcOpt(ms):                      2,245.42
SubtractBackgroundFromBufferParallelFor(ms):                      4,021.58
SubtractBackgroundFromBufferBlockParallelFor(ms):                   769.74
SubtractBackgroundFromBufferPartitionedParallelForEach(ms):         827.48
SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):     539.60

因此,从这些结果看来,最好的方法结合了计算优化以获得小增益,并且利用Parallel.For 对图像块进行操作。您的里程当然会有所不同,并且并行代码的性能对您正在运行的 CPU 很敏感。

测试工具

我在发布模式下为每个方法运行了这个。我正在以这种方式启动和停止Stopwatch,以确保仅测量处理时间。

System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
ushort[] bgImg = GenerateRandomBuffer(327680, 818687447);

for (int i = 0; i < 1000; i++)
{
    Buffer = GenerateRandomBuffer(327680, 128011992);                

    sw.Start();
    SubtractBackgroundFromBuffer(bgImg);
    sw.Stop();
}

Console.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));


public static ushort[] GenerateRandomBuffer(int size, int randomSeed)
{
    ushort[] buffer = new ushort[327680];
    Random random = new Random(randomSeed);

    for (int i = 0; i < size; i++)
    {
        buffer[i] = (ushort)random.Next(ushort.MinValue, ushort.MaxValue);
    }

    return buffer;
}

方法

public static void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        int difference = Buffer[index] - backgroundBuffer[index];

        if (difference >= 0)
            Buffer[index] = (ushort)difference;
        else
            Buffer[index] = 0;
    }
}

public static void SubtractBackgroundFromBufferWithCalcOpt(ushort[] backgroundBuffer)
{
    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        if (Buffer[index] < backgroundBuffer[index])
        {
            Buffer[index] = 0;
        }
        else
        {
            Buffer[index] -= backgroundBuffer[index];
        }
    }
}

public static void SubtractBackgroundFromBufferParallelFor(ushort[] backgroundBuffer)
{
    Parallel.For(0, Buffer.Length, (i) =>
    {
        int difference = Buffer[i] - backgroundBuffer[i];
        if (difference >= 0)
            Buffer[i] = (ushort)difference;
        else
            Buffer[i] = 0;
    });
}        

public static void SubtractBackgroundFromBufferBlockParallelFor(ushort[] backgroundBuffer)
{
    int blockSize = 4096;

    Parallel.For(0, (int)Math.Ceiling(Buffer.Length / (double)blockSize), (j) =>
    {
        for (int i = j * blockSize; i < (j + 1) * blockSize; i++)
        {
            int difference = Buffer[i] - backgroundBuffer[i];

            Buffer[i] = (ushort)Math.Max(difference, 0);                    
        }
    });
}

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                if (Buffer[i] < backgroundBuffer[i])
                {
                    Buffer[i] = 0;
                }
                else
                {
                    Buffer[i] -= backgroundBuffer[i];
                }
            }
        });
}

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
    {
        for (int i = range.Item1; i < range.Item2; ++i)
        {
            unsafe
            {
                var nonNegative = Buffer[i] > backgroundBuffer[i];
                Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                    *((int*)(&nonNegative)));
            }
        }
    });
}

【讨论】:

  • @BrunoCosta 1. 我不确定我明白你的意思。 “再次分区”是什么意思? 2. 是什么让您认为这不是在整个阵列上运行的? Blocksize 是一个有点武断的选择,也许值得进一步进行基准测试。
  • 我完全错过了发现代码......但我仍然相信 Parallel.Foreach 会自己创建一个分区。通过分区,我的意思是可以分配许多线程来处理您的 4096 块。但也许我错了..
  • @BrunoCosta 查看文档,您传递给Parallel.For 的代理每次迭代都会执行一次。我认为由此我们可以确信每次迭代都将在单个线程中运行。
  • 您还应该尝试在局部变量中存储对Buffer 的引用(在并行版本中,在委托中执行此操作)。 AFAIK 优化器不会为您执行此操作,因为 Buffer 的值可能被另一个线程更改。此外,如果 JIT 不确定数组实例不会改变,它就不能消除多余的边界检查。
  • @Daniel 我刚试过这个,即将代码 var localBuffer = Buffer; 放在 Parallel.For 委托中,无论出于何种原因,代码运行速度都相当慢。奇怪。
【解决方案2】:

这是一个有趣的问题。

仅在测试结果不会为负(如 TTat 和 Maximum Cookie 所建议的那样)后执行减法的影响可以忽略不计,因为 JIT 编译器已经可以执行此优化。

并行化任务(如Selman22 所建议)是一个好主意,但是当循环在这种情况下尽可能快时,开销最终会超过收益,所以Selman22's implementation 实际上在我的测试。我怀疑nick_w's benchmarks 是在附加调试器的情况下产生的,隐藏了这个事实。

以更大的块并行化任务(如nick_w 所建议的那样)处理开销问题,实际上可以产生更快的性能,但您不必自己计算块 - 您可以使用Partitioner 来做这是给你的:

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                if (Buffer[i] < backgroundBuffer[i])
                {
                    Buffer[i] = 0;
                }
                else
                {
                    Buffer[i] -= backgroundBuffer[i];
                }
            }
        });
}

在我的测试中,上述方法始终优于 nick_w's 手动分块。

但是等等!不仅如此。

减慢代码的真正罪魁祸首不是赋值或算术。这是if 声明。它如何影响性能将主要受您正在处理的数据的性质的影响。

nick_w's 基准测试为两个缓冲区生成相同大小的随机数据。但是,我怀疑后台缓冲区中实际上很可能具有较低的平均幅度数据。由于分支预测(如this classic SO answer 中所述),这一细节可能很重要。

当后台缓冲区中的值通常小于缓冲区中的值时,JIT 编译器会注意到这一点,并相应地针对该分支进行优化。当每个缓冲区中的数据来自同一个随机群体时,无法以超过 50% 的准确度猜测 if 语句的结果。 nick_w 正是在后一种情况下进行基准测试,在这种情况下,我们可能会通过使用不安全代码将布尔值转换为整数并完全避免分支来进一步优化您的方法。 (请注意,以下代码依赖于 bool 在内存中的表示方式的实现细节,虽然它适用于 .NET 4.5 中的场景,但它不一定是一个好主意,此处显示是为了说明目的。)

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                unsafe
                {
                    var nonNegative = Buffer[i] > backgroundBuffer[i];
                    Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                        *((int*)(&nonNegative)));
                }
            }
        });
}

如果您真的想节省更多时间,那么您可以通过将语言切换到 C++/CLI 以更安全的方式采用这种方法,因为这样您就可以在算术表达式中使用布尔值而无需诉诸不安全代码:

UInt16 MyCppLib::Maths::SafeSubtraction(UInt16 minuend, UInt16 subtrahend)
{
    return (UInt16)((minuend - subtrahend) * (minuend > subtrahend));
}

您可以使用公开上述静态方法的 C++/CLI 创建一个纯托管的 DLL,然后在您的 C# 代码中使用它:

public static void SubtractBackgroundFromBufferPartitionedParallelForEachCpp(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
    {
        for (int i = range.Item1; i < range.Item2; ++i)
        {
            Buffer[i] = 
                MyCppLib.Maths.SafeSubtraction(Buffer[i], backgroundBuffer[i]);
        }
    });
}

这胜过上面那些骇人听闻的不安全 C# 代码。事实上,它是如此之快,以至于您可以使用 C++/CLI 编写整个方法而无需考虑并行化,而且它的性能仍然优于其他技术。

使用nick_w's test harness,上述方法将优于迄今为止在此发布的任何其他建议。以下是我得到的结果(1-4 是他尝试的案例,5-7 是此答案中概述的案例):

1. SubtractBackgroundFromBuffer(ms):                               2,021.37
2. SubtractBackgroundFromBufferWithCalcOpt(ms):                    2,125.80
3. SubtractBackgroundFromBufferParallelFor(ms):                    3,431.58
4. SubtractBackgroundFromBufferBlockParallelFor(ms):               1,401.36
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):     1,197.76
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):   742.72
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms):    499.27

但是,在我希望您实际拥有的场景中,背景值通常较小,成功的分支预测可以全面改善结果,并且避免 if 语句的“hack”是实际上更慢:

当我将后台缓冲区中的值限制在0-6500 范围内(大约是缓冲区的 10%)时,我使用 nick_w's test harness 得到的结果如下:

1. SubtractBackgroundFromBuffer(ms):                                 773.50
2. SubtractBackgroundFromBufferWithCalcOpt(ms):                      915.91
3. SubtractBackgroundFromBufferParallelFor(ms):                    2,458.36
4. SubtractBackgroundFromBufferBlockParallelFor(ms):                 663.76
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):       658.05
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):   762.11
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms):    494.12

您可以看到结果 1-5 都得到了显着改善,因为它们现在受益于更好的分支预测。结果 6 和 7 没有太大变化,因为它们避免了分支。

这种数据变化彻底改变了事情。在这种情况下,即使是最快的全 C# 解决方案现在也只比您的原始代码快 15%。

底线:请务必使用具有代表性的数据测试您选择的任何方法,否则您的结果将毫无意义。

【讨论】:

  • 您正在将 bool* 转换为 int*(无效),并假设有关真正 bool 的数值(无效 - 不保证为 1)。不过,我喜欢分支避免的总体思路。
  • @usr 是的,该代码确实有效,但您说得对,依赖此实现细节不是一个好主意 - 我会澄清这一点。正如我在答案中所说,我怀疑这种 hack 对 OP 的数据实际上会变慢。
  • @GeoffBattye:感谢您的基准测试和出色的 cmets!我希望我也可以将您的答案标记为解决方案!顺便说一句,backgroundBuffer 的值几乎总是比 Buffer 小,所以希望 JIT 编译器会注意到并进行适当的优化,如您所说。
  • @usr 我现在使用 C++/CLI 添加了一个安全版本的分支避免技术。
  • 有趣的结果。这仍然是管理和 JIT 的,对吧?我想知道发出了什么指令让这变得如此之快。
【解决方案3】:

在实际执行减法之前先检查结果是否为负,您可能会获得轻微的性能提升。这样,如果结果为负,则无需执行减法。示例:

if (Buffer[index] > backgroundBuffer[index])
    Buffer[index] = (ushort)(Buffer[index] - backgroundBuffer[index]);
else
    Buffer[index] = 0;

【讨论】:

  • 这取决于jitter如何将il代码编译成汇编。即使它节省了速度,也不会超过几微秒。
【解决方案4】:

你可以试试Parallel.For

Parallel.For(0, Buffer.Length, (i) =>
{
    int difference = Buffer[i] - backgroundBuffer[i];
    if (difference >= 0)
          Buffer[i] = (ushort) difference;
    else
         Buffer[i] = 0;
}); 

更新:我已经尝试过了,我发现你的情况差异很小,但是当数组变大时,差异也会变大

【讨论】:

  • @elgonzo Parallel.For 不会为每次迭代创建新任务:Does Parallel.For use one Task per iteration?
  • 这可能会节省一些减法和转换周期: if (Buffer[i]
  • 这不会对性能产生影响,但我可能会使用 Buffer[i] = Math.max(0, difference)。 (也许您可以对其进行基准测试)
【解决方案5】:

这是一个使用Zip()的解决方案:

Buffer = Buffer.Zip<ushort, ushort, ushort>(backgroundBuffer, (x, y) =>
{
    return (ushort)Math.Max(0, x - y);
}).ToArray();

它的性能不如其他答案,但它绝对是最短的解决方案。

【讨论】:

    【解决方案6】:

    怎么样,

    Enumerable.Range(0, Buffer.Length).AsParalell().ForAll(i =>
        {
             unsafe
            {
                var nonNegative = Buffer[i] > backgroundBuffer[i];
                Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                    *((int*)(&nonNegative)));
            }
        });
    

    【讨论】:

    • 这个比 Parallel.ForeachPartitioner 慢 10 倍。令人惊讶的是它远远落后。
    猜你喜欢
    • 2013-02-21
    • 1970-01-01
    • 1970-01-01
    • 2017-01-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-06-22
    • 2020-08-12
    相关资源
    最近更新 更多