【问题标题】:Convert matlab to c++, bsxfun将matlab转换为c++、bsxfun
【发布时间】:2014-01-17 14:00:53
【问题描述】:

我正在尝试将我的 MATLAB 代码转换为 C++,发现以下情况存在问题:

MATLAB

A = rand(1000,40000);
b = rand(1000,1);
tic;
ans = bsxfun(@ne,b,A);
toc

c++

std::vector<std::vector<int> > A;
std::vector<int> b;
std::vector<int> ans(10000);

// initial A and b
const clock_t begin_time = clock();
for(int i = 0; i < 40000; ++i){
    for(int j = 0; j < 1000; ++j){
        if(A[i][j] != b[j])
            ans[i]++;
    }
}
double run_time = static_cast<double>((clock() - begin_time)) / CLOCKS_PER_SEC;

我发现 C++ 的情况比 MATLAB 慢三倍。我想问一下是否有人知道如何更改 C++ 代码,以便我可以拥有与bsxfun 相似或相同的性能?

我在网上搜索后,发现了两种可能的方法:

  1. 包括来自 Armadillo 的库
  2. 包括来自 Octave 的库

但关键是我不知道怎么做,我的意思是我不知道实现的细节。

总结:

  1. 我想问一下是否有人知道如何更改 C++ 代码,以便我可以拥有与 bsxfun 相似或相同的性能?
  2. 谁能提供一些提示或步骤或示例,以便我可以学习如何包含 Armadillo 或 Octave 来完成此任务。

编辑:

感谢@Peter,我使用选项-O3 进行编译,然后问题“解决”了,我的意思是速度与MATLAB 相同。

【问题讨论】:

  • 你的时间是否包括Ab的初始化?
  • if 替换为ans[i] += A[j][i] != b[j]; 有帮助吗? (没有分支预测失败)
  • 你试过并行化吗?例如,根据您的系统,您可以使用 pthread。然后创建 4 个线程,每个线程计算问题的一部分。这将使您的计算速度提高近 4 倍(也取决于您的硬件)。
  • 这可能是一个内存限制问题,因此多线程可能不会获得太多收益 - 但正如 @Peter 的回答所暗示的,提高缓存效率可能是前进的方向。
  • 我不建议将 std::vector<:vector>> 用于数值任务。从许多角度来看,它的效率很低,尤其是速度。相反,您可以使用专用的线性代数库获得更好的性能,例如Armadillo。例如,参见 Mat 类,它以列优先顺序连续存储元素,如 Fortran。此外,很多犰狳函数是similar 到 Matlab 函数。

标签: c++ performance matlab bsxfun


【解决方案1】:

1- 您以错误的顺序运行循环。在 C 和 C++ 中,二维数组以行为主存储,这意味着 A[j][i]A[j][i+1] 在内存中彼此相邻。 (可以这样想:A[j] 是第一个下标操作,返回对另一个向量的引用,然后您再次使用 [i] 下标)。

将数据保存在缓存中以进行尽可能多的操作是现代处理器性能的关键之一,这意味着您希望尽可能访问相邻元素。所以切换循环的顺序:

for(int j = 0; j < 1000; ++j){
    for(int i = 0; i < 40000; ++i){

2- 编译器选项非常重要。确保您在“发布”模式下构建,或启用优化。

3- 在 C++ 中将 2D 数组存储为 1D 数组是很常见的,通过乘法对行/列进行索引。也就是说,A 将是一个大小为 1000*40000 的向量,而A[j][i] 将改为A[j*row_length + i]。这样做的好处是更多的连续内存(参见第 1 点)、更少的动态内存分配和更好的缓存利用率。

【讨论】:

  • #3 是错误的,因为您谈论的数组已经 连续存储。问题中的向量向量完全是另一回事,您的建议是有效的。
【解决方案2】:

正如我在 cmets 中提到的,您的 MATLAB 代码缺少对 sum 函数的调用(否则这两个代码正在计算不同的东西!)。所以应该是:

MATLAB

A = rand(1000,40000);
B = rand(1000,1);
tic
count = sum(bsxfun(@ne, A, B));
toc

在我的机器上我得到:

Elapsed time is 0.036931 seconds.

记住上面的语句是vectorized(想想SIMD并行化)。如果大小足够大,MATLAB 也可能会自动运行此 multithreaded


这里是 C++ 代码的my version。我正在使用简单的类来创建向量/矩阵接口。请注意,底层数据基本上存储为一维数组,column-major order 类似于 MATLAB。

C++

#include <iostream>
#include <cstdlib>        // rand
#include <ctime>          // time
#include <sys/time.h>     // gettimeofday

class Timer
{
private:
    timeval t1, t2;
public:
    Timer() {}
    ~Timer() {}
    void start() { gettimeofday(&t1, NULL); }
    void stop() { gettimeofday(&t2, NULL); }
    double elapsedTime() { return (t2.tv_sec - t1.tv_sec)*1000.0 + (t2.tv_usec - t1.tv_usec)/1000; }
};

template<typename T>
class Vector
{
private:
    T *data;
    const size_t num;
public:
    Vector(const size_t num) : num(num) { data = new T[num]; }
    ~Vector() { delete[] data; }
    inline T& operator() (const size_t i) { return data[i]; }
    inline const T& operator() (const size_t i) const { return data[i]; }
    size_t size() const { return num; }
};

template<typename T>
class Matrix
{
private:
    T *data;
    const size_t nrows, ncols;
public:
    Matrix(const size_t nr, const size_t nc) : nrows(nr), ncols(nc) { data = new T[nrows * ncols]; }
    ~Matrix() { delete[] data; }
    inline T& operator() (const size_t r, const size_t c) { return data[c*nrows + r]; }
    inline const T& operator() (const size_t r, const size_t c) const { return data[c*nrows + r]; }
    size_t size1() const { return nrows; }
    size_t size2() const { return ncols; }
};

inline double rand_double(double min=0.0, double max=1.0)
{
    return (max - min) * (static_cast<double>(rand()) / RAND_MAX) + min;
}

int main() {
    // seed random number generator
    srand( static_cast<unsigned int>(time(NULL)) );

    // intialize data
    const int m = 1000, n = 40000;
    Matrix<double> A(m,n);
    Vector<double> B(m);
    for(size_t i=0; i<A.size1(); i++) {
        B(i) = rand_double();
        for(size_t j=0; j<A.size2(); j++) {
            A(i,j) = rand_double();
        }
    }

    // measure timing
    Timer timer;
    timer.start();

    // in MATLAB: count = sum(bsxfun(@ne, A, B))
    Vector<double> count(n);
    #pragma omp parallel for
    for(int j=0; j<n; ++j) {
        count(j) = 0.0;
        for(int i=0; i<m; i++) {
            count(j) += (A(i,j) != B(i));
        }
    }

    timer.stop();

    // elapsed time in milliseconds
    std::cout << "Elapsed time is " << timer.elapsedTime() << " milliseconds." << std::endl;

    return 0;
}

结果:

$ g++ -Wall -O3 test.cpp -o test
$ ./test
Elapsed time is 63 milliseconds.

如果我在启用 OpenMP 支持的情况下编译并运行它,我会得到:

$ g++ -Wall -O3 -fopenmp test.cpp -o test_omp
$ ./test_omp
Elapsed time is 16 milliseconds.

仅通过在代码中添加一行(pargma omp 宏)就可以进行不错的改进(几乎快 4 倍)。

最后一个超过了我在 MATLAB (R2013b) 中获得的 37 毫秒。代码是使用 GCC 4.8.1 编译的(MinGW-w64 在 Windows 8、Core i7 笔记本电脑上运行)。


如果您真的想突破 C++ 代码的限制,除了使用 OpenMP 实现的多线程之外,您还必须添加矢量化(SSE/AVX 内在函数)。

您可能还想考虑使用GPGPU programming(CUDA、OpenCL)。在 MATLAB 中这很容易做到:

AA = gpuArray(A);
BB = gpuArray(B);
CC = sum(bsxfun(@ne, AA, BB));
C = gather(CC);

gpuArray(.) 会将矩阵传输到 GPU,之后对其进行的所有操作都将在 GPU 设备而不是 CPU 上执行。 gather(.) 会将数组传输回 MATLAB 工作区。然而,这里的问题主要是内存限制,因此不太可能看到任何改进(由于数据传输的开销,可能会更慢)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-09-20
    • 2014-06-26
    • 2011-12-05
    • 1970-01-01
    • 1970-01-01
    • 2014-11-08
    • 2013-05-13
    • 2016-11-07
    相关资源
    最近更新 更多