【问题标题】:Fastest way to calculate minimum euclidean distance between two matrices containing high dimensional vectors计算包含高维向量的两个矩阵之间的最小欧几里得距离的最快方法
【发布时间】:2012-09-26 08:16:09
【问题描述】:

我在another thread 上提出了一个类似的问题,但后来我专注于如何使用 OpenCV。未能达到我最初想要的,我会在这里问我到底想要什么。

我有两个矩阵。矩阵 a 是 2782x128,矩阵 b 是 4000x128,都是 unsigned char 值。这些值存储在单个数组中。对于a中的每个向量,我需要b中具有最近欧几里得距离的向量的索引。

好的,现在我的代码来实现这个:

#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#include <cstdio>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <iostream>
#include <fstream>
#include "main.h"

using namespace std;

void main(int argc, char* argv[])
{
    int a_size;
    unsigned char* a = NULL;
    read_matrix(&a, a_size,"matrixa");
    int b_size;
    unsigned char* b = NULL;
    read_matrix(&b, b_size,"matrixb");

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    int* indexes = NULL;
    min_distance_loop(&indexes, b, b_size, a, a_size);

    QueryPerformanceCounter( &liEnd );

    cout << "loop time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    if (a)
    delete[]a;
if (b)
    delete[]b;
if (indexes)
    delete[]indexes;
    return;
}

void read_matrix(unsigned char** matrix, int& matrix_size, char* matrixPath)
{
    ofstream myfile;
    float f;
    FILE * pFile;
    pFile = fopen (matrixPath,"r");
    fscanf (pFile, "%d", &matrix_size);
    *matrix = new unsigned char[matrix_size*128];

    for (int i=0; i<matrix_size*128; ++i)
    {
        unsigned int matPtr;
        fscanf (pFile, "%u", &matPtr);
        matrix[i]=(unsigned char)matPtr;
    }
    fclose (pFile);
}

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    unsigned char* dataPtr;
    unsigned char* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a[dataIndex];
            vocPtr = &b[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

并附上带有样本矩阵的文件。

matrixa matrixb

我是用windows.h来计算耗时的,所以如果你想在windows以外的平台测试代码,只需要修改windows.h头文件,改变计算耗时的方式即可。

我电脑中的这段代码大约是 0.5 秒。问题是我在 Matlab 中有另一个代码可以在 0.05 秒内完成同样的事情。在我的实验中,我每秒收到几个矩阵,比如矩阵 a,所以 0.5 秒太多了。

现在用matlab代码来计算这个:

aa=sum(a.*a,2); bb=sum(b.*b,2); ab=a*b'; 
d = sqrt(abs(repmat(aa,[1 size(bb,1)]) + repmat(bb',[size(aa,1) 1]) - 2*ab));
[minz index]=min(d,[],2);

好的。 Matlab 代码使用的是 (x-a)^2 = x^2 + a^2 - 2ab。

所以我的下一个尝试是做同样的事情。我删除了自己的代码以进行相同的计算,但大约是 1.2 秒。

然后,我尝试使用不同的外部库。第一次尝试是 Eigen:

const int descrSize = 128;
MatrixXi a(a_size, descrSize);
MatrixXi b(b_size, descrSize);
MatrixXi ab(a_size, b_size);

unsigned char* dataPtr = matrixa;
for (int i=0; i<nframes; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        a(i,j)=(int)*dataPtr++;
    }
}
unsigned char* vocPtr = matrixb;
for (int i=0; i<vocabulary_size; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        b(i,j)=(int)*vocPtr ++;
    }
}
ab = a*b.transpose();
a.cwiseProduct(a);
b.cwiseProduct(b);
MatrixXi aa = a.rowwise().sum();
MatrixXi bb = b.rowwise().sum();
MatrixXi d = (aa.replicate(1,vocabulary_size) + bb.transpose().replicate(nframes,1) - 2*ab).cwiseAbs2();

int* index = NULL;
index = (int*)malloc(nframes*sizeof(int));
for (int i=0; i<nframes; ++i)
{
    d.row(i).minCoeff(&index[i]);
}

这个 Eigen 代码的成本约为 1.2

还使用了使用 opencv 的类似代码,并且 ab = a*b.transpose(); 的成本为 0.65 秒。

所以,matlab 能够这么快地完成同样的事情,而我在 C++ 中却做不到,这真的很烦人!当然,能够运行我的实验会很棒,但我认为缺乏知识才是真正让我烦恼的地方。我怎样才能达到至少与 Matlab 相同的性能?欢迎任何形式的解决方案。我的意思是,任何外部库(如果可能的话免费),循环展开的东西,模板的东西,SSE 指令(我知道它们存在),缓存的东西。正如我所说,我的主要目的是增加我的知识,以便能够以更快的性能编写这样的想法。

提前致谢

编辑:David Hammen 建议的更多代码。在进行任何计算之前,我将数组转换为 int 。代码如下:

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    int* a_int;
    int* b_int;

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    a_int = (int*)malloc(a_size*descrSize*sizeof(int));
    b_int = (int*)malloc(b_size*descrSize*sizeof(int));

    for(int i=0; i<descrSize*a_size; ++i)
        a_int[i]=(int)a[i];
    for(int i=0; i<descrSize*b_size; ++i)
        b_int[i]=(int)b[i];

    QueryPerformanceCounter( &liEnd );

    cout << "Casting time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    /*unsigned char* dataPtr;
    unsigned char* vocPtr;*/
    int* dataPtr;
    int* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a_int[dataIndex];
            vocPtr = &b_int[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

整个过程现在是 0.6,开始的铸造循环是 0.001 秒。也许我做错了什么?

EDIT2:关于 Eigen 的任何信息?当我寻找外部库时,他们总是谈论 Eigen 及其速度。我做错了什么?这是一个使用 Eigen 的简单代码,表明它不是那么快。也许我错过了一些配置或一些标志,或者......

MatrixXd A = MatrixXd::Random(1000, 1000);
MatrixXd B = MatrixXd::Random(1000, 500);
MatrixXd X;

这段代码大约是 0.9 秒。

【问题讨论】:

  • 你在发布模式下编译了所有测试?
  • 你可能会觉得 Matlab 的性能优于你的代码很烦人,但是我这个大量使用 Matlab 的人觉得它非常令人满意。我没有太多具体建议可以给你,但提高这类代码性能的关键通常是在现代 CPU 上优化(或至少非常好)使用内存层次结构。另一个需要考虑的因素是,Matlab 的大部分核心功能现在都是多线程的,可以在多核 CPU 上执行,我不清楚您自己的代码是否是多线程的;这可能会对性能产生一些影响。
  • 我不知道如何帮助您更快地编写 C/C++ 代码(您的代码看起来更像 C 而不是 C++。证据:malloc),但我知道您可以如何制作您的 Matlab 代码更快:消除 sqrt。给定两个非负数 a 和 b,sqrt(a)>sqrt(b) ⇔ a>b.
  • Denis Ermolin,是的,在调试模式下大约需要 2.5 秒。高性能马克,你是对的,当使用 Matlab 时令人满意,但现在我必须对 matlab 原型代码进行真正的实现。大卫哈曼,我知道。如果您看到 C++ 代码,我避免使用 sqrt。我还尝试通过使用 distance+=abs(multiply) 来避免乘法 * 乘法。结果?更差。大约 0.8 秒。
  • 这是一个相关的问题:stackoverflow.com/questions/6058139/…

标签: c++ performance opencv matrix-multiplication eigen


【解决方案1】:

正如您所观察到的,您的代码由代表大约 2.8e9 算术运算的矩阵乘积支配。 Yopu 说 Matlab(或者更确切地说是高度优化的 MKL)在大约 0.05 秒内计算它。这代表 57 GFLOPS 的速率,表明它不仅使用矢量化,还使用多线程。使用 Eigen,您可以通过启用 OpenMP 进行编译来启用多线程(-fopenmp 使用 gcc)。在我 5 年前的计算机(2.66Ghz Core2)上,使用浮点数和 4 个线程,您的产品大约需要 0.053 秒,而没有 OpenMP 则需要 0.16 秒,所以您的编译标志肯定有问题。总而言之,要充分利用 Eigen:

  • 在 64 位模式下编译
  • 使用浮点数(双精度由于矢量化而慢了一倍)
  • 启用 OpenMP
  • 如果你的 CPU 有超线程,那么要么禁用它,要么将OMP_NUM_THREADS 环境变量定义为物理内核数(这很重要,否则性能会很差!)
  • 如果您有其他任务正在运行,最好将OMP_NUM_THREADS 减少到nb_cores-1
  • 尽可能使用最新的编译器,GCC、clang 和 ICC 最好,MSVC 通常较慢。

【讨论】:

    【解决方案2】:

    在你的 C++ 代码中肯定会伤害到你的一件事是它有一大堆 char 到 int 的转换。装船时,我的意思是最多 2*2782*4000*128 char 到 int 的转换。那些charint 的转换非常缓慢,非常缓慢。

    您可以通过分配一对 int 数组(一个 2782*128 和另一个 4000*128)将其减少到 (2782+4000)*128 这样的转换,以包含您的 @ 转换为整数的内容987654324@ 和 char* b 数组。使用这些 int* 数组而不是您的 char* 数组。

    另一个问题可能是您使用intlong。我不在 Windows 上工作,所以这可能不适用。在我工作的机器上,int 是 32 位,long 现在是 64 位。 32 位绰绰有余,因为 255*255*128 23。

    这显然不是问题。

    令人惊讶的是,有问题的代码并未计算 Matlab 代码创建的那个巨大的 2728 x 4000 数组。更引人注目的是,Matlab 最有可能使用双精度而不是整数来实现这一点——而且它仍然在 C/C++ 代码中脱颖而出。

    一个大问题是缓存。那个 4000*128 数组对于 1 级缓存来说太大了,你正在迭代那个大数组 2782 次。您的代码在内存上等待太多了。要解决这个问题,请使用较小的 b 数组块,以便您的代码尽可能长时间地使用 1 级缓存。

    另一个问题是优化if (distance&gt;min_distance) break;。我怀疑这实际上是一种优化。在最里面的循环中进行if 测试通常是个坏主意。尽可能快地冲破那个内积。除了浪费计算之外,摆脱这个测试并没有什么坏处。有时,如果这样做可以删除最内层循环中的分支,则最好进行明显不需要的计算。这是其中一种情况。 您也许可以通过取消此测试来解决您的问题。尝试这样做。

    回到缓存问题,您需要摆脱这个分支,以便可以将ab 矩阵上的操作拆分成更小的块,一次不超过 256 行的块。这就是两个现代 Intel 芯片的 L1 缓存之一中有多少行 128 个无符号字符。由于 250 除以 4000,因此请考虑将 b 矩阵逻辑拆分为 16 个块。您可能希望形成 2872 x 4000 的大内积数组,但要分小块进行。您可以将 if (distance&gt;min_distance) break; 添加回去,但在块级别而不是逐字节级别添加。

    您应该能够击败 Matlab,因为它几乎可以肯定地使用双精度数,但您可以使用无符号字符和整数。

    【讨论】:

    • 谢谢大卫,但铸造时间大约是 0.001 秒。我也尝试将a和b数组放入int*数组中,性能更差。
    • 无论如何你都在施法,而且施法很慢。这行,multiply = *dataPtr++-*vocPtr++; 涉及从 unsigned char 到 int 的两次强制转换。 *dataPtr*vocPtr 的结果被转换为整数,然后相减。如果您提前进行演员表并获得更差的性能,您就做错了。
    • 好的,我添加了新的答案,之前将其转换为数组。编辑:我无法添加新答案,我正在编辑原始帖子。我认为它太大了......
    • 是的,我也使用了 int。新代码使用 int。性能是一样的。我的直觉说这是缓存的问题,我做了太多的内存访问。我应该避免一些内存访问,但我做不到。
    • charint 的转换并不是“非常慢”。这可能是一条额外的指令,但缓存中更好的数据密度通过需要更少的内存访问来补偿转换。
    【解决方案3】:

    矩阵乘法通常对两个矩阵之一使用最差的缓存访问模式,解决方案是转置其中一个矩阵并使用专门的乘法算法来处理以这种方式存储的数据。

    您的矩阵已转置存储。通过将其转置为正常顺序,然后使用正常矩阵相乘,您绝对会扼杀性能。

    编写您自己的矩阵乘法循环,将索引的顺序反转到第二个矩阵(这具有转置它的效果,实际上没有移动任何东西并破坏缓存行为)。并将编译器用于启用自动矢量化的任何选项传递给您。

    【讨论】:

    • 我想我不明白你的意思。在我自己实现的最小距离(第一个代码)中,我没有转置任何东西。在本征码的情况下,我需要转置它。
    • @min.yong.yoon:嗯,你看过编译器为那个乘法循环生成的代码吗?是否使用 SSE2 指令?
    • 我尝试在我的 Visul Studio 10 项目设置中激活 SSE2 指令,但耗时相同,循环生成的代码也相同
    • @min.yong.yoon:我认为微软的编译器在自动矢量化方面已经落后了。它支持 SSE 指令,但您必须通过内在函数自己使用它们。最新版本,即捆绑在 Visual Studio 2012 中的版本,终于改变了这一点。您可以使用 G++ 或 Intel C++ 对代码进行基准测试,因为它们都能够自动矢量化。
    • 更新。我自己的循环使用 SSE/SSE2 并没有得到更好的速度,但 Eigen 方式更快。 SSE2 使用 int 矩阵,0.74 秒,但如果我使用浮点矩阵,0.49 秒。这是一个改进,但它类似于我自己的循环。对此还有什么想法吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-06-14
    • 2014-08-15
    相关资源
    最近更新 更多