Frank 说得对,您的直接问题可能是您使用的是非原子操作:
average[k] += vector[k];
您可以使用以下方法解决此问题:
#pragma omp atomic
average[k] += vector[k];
但一个更大的概念问题是这可能不会加快您的代码速度。您正在执行的操作非常简单,并且您的内存(至少是行)是连续的。
确实,我为您的代码制作了一个最小工作示例(您应该为您的问题这样做):
#include <vector>
#include <cstdlib>
#include <chrono>
#include <iostream>
using namespace std;
vector<float> average(const vector<vector<unsigned char>>& original){
vector<float> average(original[0].size(), 0.0);
#pragma omp parallel for
for (int i=0; i<original.size(); i++) {
const vector<unsigned char>& vector = original[i];
for (int k = 0; k < vector.size(); ++k) {
#pragma omp atomic
average[k] += vector[k];
}
}
for (float& val : average) {
val /= original.size();
}
return average;
}
int main(){
vector<vector<unsigned char>> mat(1000);
for(int y=0;y<mat.size();y++)
for(int x=0;x<mat.size();x++)
mat.at(y).emplace_back(rand()%255);
std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
double dont_optimize = 0;
for(int i=0;i<100;i++){
auto ret = average(mat);
dont_optimize += ret[0];
}
std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
std::cout<<"Time = "<<(std::chrono::duration_cast<std::chrono::microseconds>(end - begin).count()/100)<<std::endl;
return 0;
}
使用 g++ -O3 temp.cpp -fopenmp 编译它会启用 OpenMP。我的四核机器上的运行时间始终约为 10,247 微秒。当我禁用 OpenMP 时,运行时间约为 2,561 微秒。
启动和管理线程团队的成本很高。
但是有一个真正的方法可以加速你的代码:改善你的内存布局。
使用std::vector< std::vector<T> > 设计意味着每个vector<T> 可以位于内存中的任何位置。相反,我们希望我们所有的记忆都很好且连续。我们可以通过使用平面数组索引来实现这一点,如下所示:
(请注意,使用了以下代码的早期版本,例如,mat.at(y*width+x)。与使用 mat[y*width+x] 相比,检查此范围意味着会导致速度显着下降,就像现在的代码一样。时间已适当更新.)
#include <vector>
#include <cstdlib>
#include <chrono>
#include <iostream>
using namespace std;
class Matrix {
public:
vector<unsigned char> mat;
int width;
int height;
Matrix(int width0, int height0){
width = width0;
height = height0;
for(int i=0;i<width*height;i++)
mat.emplace_back(rand()%255);
}
unsigned char& operator()(int x, int y){
return mat[y*width+x];
}
unsigned char operator()(int x, int y) const {
return mat[y*width+x];
}
unsigned char& operator()(int i){
return mat[i];
}
unsigned char operator()(int i) const {
return mat[i];
}
};
vector<float> average(const Matrix& original){
vector<float> average(original.width, 0.0);
#pragma omp parallel for
for(int y=0;y<original.height;y++)
for(int x=0;x<original.width;x++)
#pragma omp atomic
average[x] += original(x,y);
for (float& val : average)
val /= original.height;
return average;
}
int main(){
Matrix mat(1000,1000);
std::cerr<<mat.width<<" "<<mat.height<<std::endl;
std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
double dont_optimize = 0;
for(int i=0;i<100;i++){
auto ret = average(mat);
dont_optimize += ret[0];
}
std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
std::cout<<"Time = "<<(std::chrono::duration_cast<std::chrono::microseconds>(end - begin).count()/100)<<std::endl;
return 0;
}
请注意,我还使用float 而不是double:您可以通过这种方式将两倍的数字塞入相同数量的空间,这有利于缓存。
这使得没有 OpenMP 的运行时间为 292 微秒,而使用 OpenMP 的运行时间为 9426 微秒。
总之,使用 OpenMP/并行性会减慢您的代码速度,因为并行完成的工作比设置并行性花费的时间更少,但使用更好的内存布局可以提高大约 90% 的速度。此外,使用我创建的方便的 Matrix 类可以提高代码的可读性和可维护性。
编辑:
在 10,000x10,000 而不是 1,000x1,000 的矩阵上运行此程序会得到类似的结果。对于向量的向量:不使用 OpenMP 为 7,449 微秒,使用 OpenMP 为 156,316 微秒。对于平面数组索引:不使用 OpenMP 为 32,668 微秒,使用 OpenMP 为 145,470 微秒。
性能可能与我机器上可用的硬件指令集有关(特别是,如果我的机器缺少原子指令,那么 OpenMP 将不得不使用互斥锁等来模拟它们)。实际上,在使用-march=native 编译的平面阵列示例中,OpenMP 的性能得到了改进,尽管仍然不是很好:没有 OpenMP 时为 33,079 微秒,使用 OpenMP 时为 127,841 微秒。稍后我会尝试更强大的机器。
编辑
虽然上述测试是在 Intel(R) Core(TM) i5 CPU M 480 @ 2.67GHz 上执行的,但我已经在坏蛋 Intel(R) Xeon(R) CPU 上编译了这段代码(使用 -O3 -march=native) E5-2680 v3 @ 2.50GHz。结果相似:
- 1000x1000,向量的向量,没有 OpenMP:145μs
- 1000x1000,向量的向量,使用 OpenMP:2,941μs
- 10000x10000,向量的向量,没有 OpenMP:20,254μs
- 10000x10000,向量的向量,使用 OpenMP:85,703μs
- 1000x1000,平面阵列,无 OpenMP:139μs
- 1000x1000,平面阵列,使用 OpenMP:3,171μs
- 10000x10000,平面阵列,无 OpenMP:18,712μs
- 10000x10000,平面阵列,使用 OpenMP:89,097μs
这证实了我们之前的结果:使用 OpenMP 执行此任务往往会减慢速度,即使您的硬件非常出色。事实上,这两个处理器之间的大部分加速可能是由于 Xeon 的大型 L3 缓存大小:30,720K 它比 i5 上的 3,720K 缓存大 10 倍。
编辑
从他们的回答中结合Zulan's reduction strategy 可以让我们有效地利用并行性:
vector<float> average(const Matrix& original){
vector<float> average(original.width, 0.0);
auto average_data = average.data();
#pragma omp parallel for reduction(+ : average_data[ : original.width])
for(int y=0;y<original.height;y++){
for(int x=0;x<original.width;x++)
average_data[x] += original(x,y);
}
for (float& val : average)
val /= original.height;
return average;
}
对于 24 个线程,这在 10,000x10,000 阵列上提供了 2629 微秒:比串行版本提高了 7.1 倍。在您的原始代码上使用 Zulan 的策略(没有平面数组索引)可以得到 3529 微秒,因此通过使用更好的布局,我们仍然可以获得 25% 的加速。