【发布时间】:2012-09-26 08:16:09
【问题描述】:
我在another thread 上提出了一个类似的问题,但后来我专注于如何使用 OpenCV。未能达到我最初想要的,我会在这里问我到底想要什么。
我有两个矩阵。矩阵 a 是 2782x128,矩阵 b 是 4000x128,都是 unsigned char 值。这些值存储在单个数组中。对于a中的每个向量,我需要b中具有最近欧几里得距离的向量的索引。
好的,现在我的代码来实现这个:
#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#include <cstdio>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <iostream>
#include <fstream>
#include "main.h"
using namespace std;
void main(int argc, char* argv[])
{
int a_size;
unsigned char* a = NULL;
read_matrix(&a, a_size,"matrixa");
int b_size;
unsigned char* b = NULL;
read_matrix(&b, b_size,"matrixb");
LARGE_INTEGER liStart;
LARGE_INTEGER liEnd;
LARGE_INTEGER liPerfFreq;
QueryPerformanceFrequency( &liPerfFreq );
QueryPerformanceCounter( &liStart );
int* indexes = NULL;
min_distance_loop(&indexes, b, b_size, a, a_size);
QueryPerformanceCounter( &liEnd );
cout << "loop time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;
if (a)
delete[]a;
if (b)
delete[]b;
if (indexes)
delete[]indexes;
return;
}
void read_matrix(unsigned char** matrix, int& matrix_size, char* matrixPath)
{
ofstream myfile;
float f;
FILE * pFile;
pFile = fopen (matrixPath,"r");
fscanf (pFile, "%d", &matrix_size);
*matrix = new unsigned char[matrix_size*128];
for (int i=0; i<matrix_size*128; ++i)
{
unsigned int matPtr;
fscanf (pFile, "%u", &matPtr);
matrix[i]=(unsigned char)matPtr;
}
fclose (pFile);
}
void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
const int descrSize = 128;
*indexes = (int*)malloc(a_size*sizeof(int));
int dataIndex=0;
int vocIndex=0;
int min_distance;
int distance;
int multiply;
unsigned char* dataPtr;
unsigned char* vocPtr;
for (int i=0; i<a_size; ++i)
{
min_distance = LONG_MAX;
for (int j=0; j<b_size; ++j)
{
distance=0;
dataPtr = &a[dataIndex];
vocPtr = &b[vocIndex];
for (int k=0; k<descrSize; ++k)
{
multiply = *dataPtr++-*vocPtr++;
distance += multiply*multiply;
// If the distance is greater than the previously calculated, exit
if (distance>min_distance)
break;
}
// if distance smaller
if (distance<min_distance)
{
min_distance = distance;
(*indexes)[i] = j;
}
vocIndex+=descrSize;
}
dataIndex+=descrSize;
vocIndex=0;
}
}
并附上带有样本矩阵的文件。
我是用windows.h来计算耗时的,所以如果你想在windows以外的平台测试代码,只需要修改windows.h头文件,改变计算耗时的方式即可。
我电脑中的这段代码大约是 0.5 秒。问题是我在 Matlab 中有另一个代码可以在 0.05 秒内完成同样的事情。在我的实验中,我每秒收到几个矩阵,比如矩阵 a,所以 0.5 秒太多了。
现在用matlab代码来计算这个:
aa=sum(a.*a,2); bb=sum(b.*b,2); ab=a*b';
d = sqrt(abs(repmat(aa,[1 size(bb,1)]) + repmat(bb',[size(aa,1) 1]) - 2*ab));
[minz index]=min(d,[],2);
好的。 Matlab 代码使用的是 (x-a)^2 = x^2 + a^2 - 2ab。
所以我的下一个尝试是做同样的事情。我删除了自己的代码以进行相同的计算,但大约是 1.2 秒。
然后,我尝试使用不同的外部库。第一次尝试是 Eigen:
const int descrSize = 128;
MatrixXi a(a_size, descrSize);
MatrixXi b(b_size, descrSize);
MatrixXi ab(a_size, b_size);
unsigned char* dataPtr = matrixa;
for (int i=0; i<nframes; ++i)
{
for (int j=0; j<descrSize; ++j)
{
a(i,j)=(int)*dataPtr++;
}
}
unsigned char* vocPtr = matrixb;
for (int i=0; i<vocabulary_size; ++i)
{
for (int j=0; j<descrSize; ++j)
{
b(i,j)=(int)*vocPtr ++;
}
}
ab = a*b.transpose();
a.cwiseProduct(a);
b.cwiseProduct(b);
MatrixXi aa = a.rowwise().sum();
MatrixXi bb = b.rowwise().sum();
MatrixXi d = (aa.replicate(1,vocabulary_size) + bb.transpose().replicate(nframes,1) - 2*ab).cwiseAbs2();
int* index = NULL;
index = (int*)malloc(nframes*sizeof(int));
for (int i=0; i<nframes; ++i)
{
d.row(i).minCoeff(&index[i]);
}
这个 Eigen 代码的成本约为 1.2
还使用了使用 opencv 的类似代码,并且 ab = a*b.transpose(); 的成本为 0.65 秒。
所以,matlab 能够这么快地完成同样的事情,而我在 C++ 中却做不到,这真的很烦人!当然,能够运行我的实验会很棒,但我认为缺乏知识才是真正让我烦恼的地方。我怎样才能达到至少与 Matlab 相同的性能?欢迎任何形式的解决方案。我的意思是,任何外部库(如果可能的话免费),循环展开的东西,模板的东西,SSE 指令(我知道它们存在),缓存的东西。正如我所说,我的主要目的是增加我的知识,以便能够以更快的性能编写这样的想法。
提前致谢
编辑:David Hammen 建议的更多代码。在进行任何计算之前,我将数组转换为 int 。代码如下:
void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
const int descrSize = 128;
int* a_int;
int* b_int;
LARGE_INTEGER liStart;
LARGE_INTEGER liEnd;
LARGE_INTEGER liPerfFreq;
QueryPerformanceFrequency( &liPerfFreq );
QueryPerformanceCounter( &liStart );
a_int = (int*)malloc(a_size*descrSize*sizeof(int));
b_int = (int*)malloc(b_size*descrSize*sizeof(int));
for(int i=0; i<descrSize*a_size; ++i)
a_int[i]=(int)a[i];
for(int i=0; i<descrSize*b_size; ++i)
b_int[i]=(int)b[i];
QueryPerformanceCounter( &liEnd );
cout << "Casting time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;
*indexes = (int*)malloc(a_size*sizeof(int));
int dataIndex=0;
int vocIndex=0;
int min_distance;
int distance;
int multiply;
/*unsigned char* dataPtr;
unsigned char* vocPtr;*/
int* dataPtr;
int* vocPtr;
for (int i=0; i<a_size; ++i)
{
min_distance = LONG_MAX;
for (int j=0; j<b_size; ++j)
{
distance=0;
dataPtr = &a_int[dataIndex];
vocPtr = &b_int[vocIndex];
for (int k=0; k<descrSize; ++k)
{
multiply = *dataPtr++-*vocPtr++;
distance += multiply*multiply;
// If the distance is greater than the previously calculated, exit
if (distance>min_distance)
break;
}
// if distance smaller
if (distance<min_distance)
{
min_distance = distance;
(*indexes)[i] = j;
}
vocIndex+=descrSize;
}
dataIndex+=descrSize;
vocIndex=0;
}
}
整个过程现在是 0.6,开始的铸造循环是 0.001 秒。也许我做错了什么?
EDIT2:关于 Eigen 的任何信息?当我寻找外部库时,他们总是谈论 Eigen 及其速度。我做错了什么?这是一个使用 Eigen 的简单代码,表明它不是那么快。也许我错过了一些配置或一些标志,或者......
MatrixXd A = MatrixXd::Random(1000, 1000);
MatrixXd B = MatrixXd::Random(1000, 500);
MatrixXd X;
这段代码大约是 0.9 秒。
【问题讨论】:
-
你在发布模式下编译了所有测试?
-
你可能会觉得 Matlab 的性能优于你的代码很烦人,但是我这个大量使用 Matlab 的人觉得它非常令人满意。我没有太多具体建议可以给你,但提高这类代码性能的关键通常是在现代 CPU 上优化(或至少非常好)使用内存层次结构。另一个需要考虑的因素是,Matlab 的大部分核心功能现在都是多线程的,可以在多核 CPU 上执行,我不清楚您自己的代码是否是多线程的;这可能会对性能产生一些影响。
-
我不知道如何帮助您更快地编写 C/C++ 代码(您的代码看起来更像 C 而不是 C++。证据:
malloc),但我知道您可以如何制作您的 Matlab 代码更快:消除sqrt。给定两个非负数 a 和 b,sqrt(a)>sqrt(b) ⇔ a>b. -
Denis Ermolin,是的,在调试模式下大约需要 2.5 秒。高性能马克,你是对的,当使用 Matlab 时令人满意,但现在我必须对 matlab 原型代码进行真正的实现。大卫哈曼,我知道。如果您看到 C++ 代码,我避免使用 sqrt。我还尝试通过使用 distance+=abs(multiply) 来避免乘法 * 乘法。结果?更差。大约 0.8 秒。
-
这是一个相关的问题:stackoverflow.com/questions/6058139/…
标签: c++ performance opencv matrix-multiplication eigen