【发布时间】:2015-07-09 13:06:29
【问题描述】:
我已经使用 RcppParallel 编写了分组求和的并行实现。
// [[Rcpp::depends(RcppParallel)]]
#include <Rcpp.h>
#include <RcppParallel.h>
using namespace Rcpp;
using namespace RcppParallel;
struct SumsG: public Worker
{
const RVector<double> v;
const RVector<int> gi;
RVector<double> sg;
SumsG(const NumericVector v, const IntegerVector gi, NumericVector sg): v(v), gi(gi), sg(sg) {}
SumsG(const SumsG& p, Split): v(p.v), gi(p.gi), sg(p.sg) {}
void operator()(std::size_t begin, std::size_t end) {
for (std::size_t i = begin; i < end; i++) {
sg[gi[i]] += v[i];
}
}
void join(const SumsG& p) {
for(std::size_t i = 0; i < sg.length(); i++) {
sg[i] += p.sg[i];
}
}
};
// [[Rcpp::export]]
List sumsingroups(NumericVector v, IntegerVector gi, int ni) {
NumericVector sg(ni);
SumsG p(v, gi, sg);
parallelReduce(0, v.length(), p);
return List::create(_["sg"] = p.sg);
}
它使用Rcpp::sourceCpp 编译。现在,当我从 R sumsingroups(1:10, rep(0:1, each = 5), 2) 多次调用它时,我得到了正确答案(15 40),然后得到了不同的答案(通常是正确答案的乘法)。跑步
res <- sumsingroups(1:10, rep(0:1, each = 5), 2)
for(i in 1:1000) {
tmp <- sumsingroups(1:10, rep(0:1, each = 5), 2)
if(res[[1]][1] != tmp[[1]][1]) break
Sys.sleep(0.1)
}
在随机迭代返回时中断
$sg
[1] 60 160
或
$sg
[1] 30 80
我是 Rcpp 和 RcppParallel 的新手,不知道什么会导致这种行为。
更新。没有帮助的事情:
- 已将
for (std::size_t i = 0; i < sg.length(); i++) sg[i] = 0;添加到 两个构造函数。 - 更改了名称,使它们在
Worker定义并在函数实现中。
【问题讨论】:
-
我怀疑你的线程在循环的情况下是不同步的。必须有一个命令来确保每个线程在收集结果之前已经完成,但是我从来没有使用过RcppParallel,所以我不知道这是怎么做到的。
-
Rcpp 有一个
parallelFor()函数来处理这种情况,它负责内部线程锁定。 -
我不明白在这种情况下如何使用
parallelFor()。 -
我的意思是在你的 Rcpp 代码中。你有两个 for 循环。用
parallelFor()函数替换它们可能会更好,至少我是这样解释documentation 的。