【发布时间】:2021-04-12 23:19:59
【问题描述】:
我编写了一个算法,旨在模拟实验产生的数据,然后对该数据执行“巧合搜索”(稍后会详细介绍......)。有问题的数据是vector<vector<double> >,其中的元素是从高斯分布(或多或少的随机数)中挑选出来的。每个“列”代表一个“数据流”,每一行代表一个瞬间。必须保留“数组”中每个元素的“位置”。
算法:
该算法旨在执行以下任务:
同时遍历所有n列(数据流),并计算至少c唯一列具有绝对值大于某个阈值的元素的次数,使得元素位于指定时间间隔(即一定数量的行)。
当这种情况发生时,我们将一个计数器加一,然后在时间上(按行)向前跳转某个指定的数量。我们重新开始,直到我们遍历了整个“数组”。最后,我们返回计数器的值(“符合次数”)。
我的解决方案:
我先给出代码,然后一步一步解释它的操作(并希望澄清一些细节):
size_t numOfCoincidences(vector<vector<double>> array, double value_threshold, size_t num_columns){
set<size_t> cache;
size_t coincidence_counter = 0, time_counter = 0;
auto exceeds_threshold = [&](double element){ return fabs(element) >= value_threshold; };
for(auto row_itr = begin(array); row_itr != end(row_itr); ++row_itr){
auto &row = *row_itr;
auto coln_itr = std::find_if(execution::par_unseq, begin(row), end(row), exceeds_threshold);
while(coln_itr != row.end()){
cache.insert(distance(begin(row), coln_itr));
coln_itr = std::find_if(next(coln_itr), end(row), exceeds_threshold);
}
if(size(cache) >= num_columns){
++coincidence_counter;
cache.clear();
if(distance(row_ctr, end(waveform)) > (4004000 - time_counter)){
advance(row_ctr, ((4004000 - time_counter)));
} else {
return coincidence_counter;
}
}
if(time_counter == time_threshold){
row_itr -= (time_counter + 1);
cache.clear();
}
++time_counter;
}
if(cache.size() == 0) time_counter = 0;
return(coincidence_counter);
}
它是如何工作的......
我逐行遍历数据 (vector<vector<double> > array):
for(auto row_itr = begin(array); row_itr != end(row_itr); ++row_itr)
对于每一行,我使用std::find_if 来获取每个超过阈值(value_threshold)的元素:
auto coln_itr = std::find_if(execution::par_unseq, begin(row), end(row), exceeds_threshold);
while(coln_itr != row.end()){
cache.insert(distance(begin(row), coln_itr));
coln_itr = std::find_if(next(coln_itr), end(row), exceeds_threshold);
}
我要的是柱状索引,所以我使用std::distance 来获取它并将其存储在std::set、cache 中。我在这里选择std::set 是因为我有兴趣计算在某个时间(即行)间隔内值超过value_threshold 的unique 列的数量。通过使用std::set,我可以转储每个此类值的列索引,并且“自动删除”重复项。然后,稍后,我可以简单地检查cache 的大小,如果它大于或等于指定的数字(num_columns),我发现了一个“巧合”。
获得超过value_threshold 的每个值的列索引后,我检查cache 的大小以查看是否找到了足够的唯一列。如果有,我将一个添加到coincidence_counter,我清除cache,然后在“时间”(即行)中向前跳一些指定的数量(这里是4004000 - time_counter)。请注意,我减去了time_counter,它从超过value_threshold 的第一个找到的值中跟踪“时间”(行数)。我想从那个起点及时向前跳跃。
if(size(cache) >= num_columns){
++coincidence_counter;
cache.clear();
if(distance(row_ctr, end(waveform)) > (4004000 - time_counter)){
advance(row_ctr, ((4004000 - time_counter)));
} else {
return coincidence_counter;
}
}
最后,我检查了time_counter。请记住,num_columns 唯一列必须在某个时间(即行)阈值之内。我从第一个超过value_threshold 的值开始计算时间。如果我已经超过了时间阈值,我想做的是空cache(),并使用第二个找到的超过阈值的值(如果有的话)作为new第一个重新开始-found 值,并希望以此为起点找到巧合。
我没有跟踪每个找到的值的时间(即行索引),而是从第一个找到的值(即time_counter + 1)之后的一个重新开始。
if(time_counter == time_threshold){
row_itr -= (time_counter + 1);
cache.clear();
}
我还在每个循环中为time_counter 添加一个,并将其设置为等于0 如果cache 的大小为0(我想从第一个找到的值开始计算时间(即行)超过value_threshold)。
尝试的优化:
我不确定这些是否有帮助、伤害或其他方面,但这是我尝试过的(收效甚微)
我已将所有int 和unsigned int 替换为size_t。我知道这个可能会稍微快一点,而且这些值无论如何都不应该小于0。
我还使用了execution::par_unseq 和std::find_if。我不确定这有多大帮助。 “数组”通常具有大约16-20 列,但异常 大量行(大约50000000 或更多)。由于std::find_if 正在“扫描”单个行,而这些行只有几十个元素,所以最多并行化可能没有多大帮助。
目标:
不幸的是,该算法需要很长时间才能运行。我的首要任务是速度。如果可能的话,我想把执行时间减半。
需要记住的一些事项:
“数组”通常按~20 列乘以~50000000 行的顺序排列(有时更长)。它的0's 很少,不能重新排列(“行”的顺序和每行中的元素很重要)。它占用(不出所料)一 吨 的内存,因此我的机器资源非常有限。
我也在cling 中以C++ 的解释运行它。在我的工作中,我从来没有使用编译过的C++。我试过编译,但没有太大帮助。我也尝试过使用编译器优化标志。
可以做些什么来缩短执行时间(以牺牲几乎其他任何东西为代价?)
如果我可以提供任何其他信息来帮助回答问题,请告诉我。
【问题讨论】:
-
将您的文件放入我的 IDE 并尝试编译。有很多语法错误。许多我可以解决。但是:什么是“time_threshold”和“waveform”。它们在何处以及如何以何种目的进行定义。如果你能给我这个额外的信息。我将使用分析器检查速度丢失的位置。
-
您可以通过使用 -O3 编译来获得所需的执行速度。
标签: c++ algorithm performance optimization micro-optimization