【问题标题】:Constructing distance matrix in parallel in C++11 using OpenMP使用 OpenMP 在 C++11 中并行构建距离矩阵
【发布时间】:2021-12-07 10:44:55
【问题描述】:

我想使用 OpenMP 在 C++11 中并行构建一个距离矩阵。我阅读了各种文档、介绍、示例等。但是,我仍然有一些问题。为了便于回答这篇文章,我将我的问题表述为假设编号 1 到 7。这​​样,您可以快速浏览它们并指出哪些是正确的,哪些不是。

让我们从一个计算密集犰狳矩阵的简单串行执行函数开始:

// [[Rcpp::export]]
arma::mat compute_dist_mat(arma::mat &coordinates, unsigned int n_points) {
  arma::mat dist_mat(n_points, n_points, arma::fill::zeros);
  double dist {};
  for(unsigned int i {0}; i < n_points; i++) {
    for(unsigned int j = i + 1; j < n_points; j++) {
      dist = compute_dist(coordinates(i, 1), coordinates(j, 1), coordinates(i, 0), coordinates(j, 0));
      dist_mat.at(i, j) = dist;
      dist_mat.at(j, i) = dist;
    }
  }
  return dist_mat;
}

附带说明:该函数应该通过 Rcpp 接口从 R 调用 - 由 // [[Rcpp::export]] 指示。因此文件的顶部包括

#include <RcppArmadillo.h>
// [[Rcpp::depends(RcppArmadillo)]]
// [[Rcpp::plugins(cpp11)]]
#include <omp.h>
// [[Rcpp::plugins(openmp)]]

using namespace Rcpp;
using namespace arma;

但是,如果没有 R 接口,该函数也应该可以正常工作。

为了使代码并行化,我将循环替换为

unsigned int i {};
unsigned int j {};
# pragma omp parallel for private(dist, i, j) num_threads(n_threads) if(n_threads > 1)
for(i = 0; i < n_points; i++) {
  for(j = i + 1; j < n_points; j++) {
    dist = compute_dist(coordinates(i, 1), coordinates(j, 1), coordinates(i, 0), coordinates(j, 0));
    dist_mat.at(i, j) = dist;
    dist_mat.at(j, i) = dist;
  }
}

并将n_threads 作为参数添加到compute_dist_mat 函数。

  1. 这会将外循环的迭代分布在线程之间,而内循环的迭代由处理外循环的相应线程执行。
  2. 两个循环级别无法组合,因为内循环依赖于外循环。
  3. distij 都将在 # pragma 行上方初始化,然后声明为私有,而不是在循环中初始化它们。
  4. # pragma 行在n_treads = 1 时没有任何作用,导致串行执行。

扩展密集矩阵应用,下面的代码块说明了批量插入的串行稀疏矩阵情况。为了鼓励在这里使用稀疏矩阵,我将低于某个阈值的距离设置为零。

// [[Rcpp::export]]
arma::sp_mat compute_dist_spmat(arma::mat &coordinates, unsigned int n_points, double dist_threshold) {
  std::vector<double> dists;
  std::vector<unsigned int> dist_i;
  std::vector<unsigned int> dist_j;
  double dist {};
  for(unsigned long int i {0}; i < n_points; i++) {
    for(unsigned long int j = i + 1; j < n_points; j++) {
      dist = compute_dist(coordinates(i, 1), coordinates(j, 1), coordinates(i, 0), coordinates(j, 0));
      if(dist >= dist_threshold) {
        dists.push_back(dist);
        dist_i.push_back(i);
        dist_j.push_back(j);
      }
    }
  }
  unsigned int mat_size = dist_i.size();
  arma::umat index_mat(2, mat_size * 2);
  arma::vec dists_vec(mat_size * 2);
  unsigned int j {};
  for(unsigned int i {0}; i < mat_size; i++) {
    j = i * 2;
    index_mat.at(0, j) = dist_i[i];
    index_mat.at(1, j) = dist_j[i];
    index_mat.at(0, j + 1) = dist_j[i];
    index_mat.at(1, j + 1) = dist_i[i];
    dists_vec.at(j) = dists[i];
    dists_vec.at(j + 1) = dists[i];
  }
  arma::sp_mat dist_mat(index_mat, values_vec, n_points, n_points);
  return dist_mat;
}

因为该函数事前不知道有多少距离高于阈值,所以它首先将非零值存储在标准向量中,然后从中构造犰狳对象。

我将函数并行化如下:

// [[Rcpp::export]]
arma::sp_mat compute_dist_spmat(arma::mat &coordinates, unsigned int n_points, double dist_threshold, unsigned short int n_threads) {
  std::vector<std::vector<double>> dists(n_points);
  std::vector<std::vector<unsigned int>> dist_j(n_points);
  double dist {};
  unsigned int i {};
  unsigned int j {};
  # pragma omp parallel for private(dist, i, j) num_threads(n_threads) if(n_threads > 1)
  for(i = 0; i < n_points; i++) {
    for(j = i + 1; j < n_points; j++) {
      dist = compute_dist(coordinates(i, 1), coordinates(j, 1), coordinates(i, 0), coordinates(j, 0));
      if(dist >= dist_threshold) {
        dists[i].push_back(dist);
        dist_j[i].push_back(j);
      }
    }
  }
  unsigned int vec_intervals[n_points + 1];
  vec_intervals[0] = 0;
  for (i = 0; i < n_points; i++) {
    vec_intervals[i + 1] = vec_intervals[i] + dist_j[i].size();
  }
  unsigned int mat_size {vec_intervals[n_points]};
  arma::umat index_mat(2, mat_size * 2);
  arma::vec dists_vec(mat_size * 2);
  unsigned int vec_begins_i {};
  unsigned int vec_length_i {};
  unsigned int k {};
  # pragma omp parallel for private(i, j, k, vec_begins_i, vec_length_i) num_threads(n_threads) if(n_threads > 1)
  for(i = 0; i < n_points; i++) {
    vec_begins_i = vec_intervals[i];
    vec_length_i = vec_intervals[i + 1] - vec_begins_i;
    for(j = 0, j < vec_length_i, j++) {
      k = (vec_begins_i + j) * 2;
      index_mat.at(0, k) = i;
      index_mat.at(1, k) = dist_j[i][j];
      index_mat.at(0, k + 1) = dist_j[i][j];
      index_mat.at(1, k + 1) = i;
      dists_vec.at(k) = dists[i][j];
      dists_vec.at(k + 1) = dists[i][j];
    }
  }
  arma::sp_mat dist_mat(index_mat, dists_vec, n_points, n_points);
  return dist_mat;
}
  1. 在循环中使用动态向量是线程安全的。
  2. distijkvec_begins_ivec_length_i 都将在 # pragma 行上方初始化,然后声明为私有,而不是在循环中初始化它们。
  3. 无需将任何内容标记为部分。

这七个陈述中是否有任何一个不正确?

【问题讨论】:

  • 3 和 6 是错误的:无论哪种方式都可以,但是在循环/并行区域内声明变量/对象是更好的样式,并且不需要 private 子句。 /跨度>
  • 5 错误:std::vector::push_back 等方法一般不是线程安全的。您只需以一种有效的方式使用它,因为线程之间不会有冲突(据我所知)。
  • 我不认为if(n_threads &gt; 1) 子句应该有任何区别。此外,为了避免负载不平衡,应该使用例如dynamic 当存在依赖于外部(并行)循环的内部循环时进行调度。
  • 我有一种模糊的感觉,我们已经在 CRAN 有并行距离矩阵助手,但我现在可以想到一个具体的名称(而且没有时间查看)。这是一个令人尴尬的并行问题,所以我认为这已经完成了......
  • 感谢 cmets 和建议。 @DirkEddelbuettel 至少,该领域的主要软件包似乎可以连续计算距离矩阵。我问这个问题主要是为了澄清一些关于 OpenMP 的问题。

标签: c++ c++11 openmp rcpp armadillo


【解决方案1】:

以下内容并未直接回答您的问题(只是我从个人 GitHub 存储库中复制的一些开发代码),但它清楚地说明了几点可能对您的应用程序有用:

  • 只要您不在parallel 循环内进行任何动态内存分配,OpenMP 就会自动确定私有成员
  • 对于稀疏矩阵距离计算,重要的是要超越每个非零索引处的简单距离计算,而是考虑预期的稀疏结构并为此进行优化。在下面的示例中,我假设两个矩阵都非常稀疏,并且它们的交集小于它们的并集。因此,我用平方列和(用于计算欧几里得距离)“预先确定”每个距离计算,然后仅调整交叉点的计算。这避免了复杂的迭代器结构并且速度非常快。
  • 使用尽可能少的临时代码对您大有裨益,稀疏矩阵迭代器在这方面的作用与任何人可能编写的任何替代代码一样好。
  • Eigen 提供比 Armadillo 更好的矢量化(我可能会全面补充),这意味着如果最后 20% 的性能提升对您很重要,您需要 Eigen 而不是 Armadillo。

此函数计算Eigen::SparseMatrix&lt;double&gt; 对象中所有唯一列对之间的欧几里得距离:

// sparse column-wise Euclidean distance between all columns
Eigen::MatrixXd distance(Eigen::SparseMatrix<double>& A) {
    Eigen::MatrixXd dists(A.cols(), A.cols());
    Eigen::VectorXd sq_colsums(A.cols());
    for (int col = 0; col < A.cols(); ++col)
        for (Eigen::SparseMatrix<double>::InnerIterator it(A, col); it; ++it)
            sq_colsums(col) += it.value() * it.value();

    #pragma omp parallel for
    for (unsigned int i = 0; i < (A.cols() - 1); ++i) {
        for (unsigned int j = (i + 1); j < A.cols(); ++j) {
            double dist = sq_colsums(i) + sq_colsums(j);
            Eigen::SparseMatrix<double>::InnerIterator it1(A, i), it2(A, j);
            while (it1 && it2) {
                if (it1.row() < it2.row()) ++it1;
                else if (it1.row() > it2.row()) ++it2;
                else {
                    dist -= it1.value() * it1.value();
                    dist -= it2.value() * it2.value();
                    dist += std::pow(it1.value() - it2.value(), 2);
                    ++it1; ++it2;
                }
            }
            dists(i, j) = std::sqrt(dist);
            dists(j, i) = dists(i, j);
        }
    }
    dists.diagonal().array() = 1;
    return dists;
}

正如 Dirk 和其他人所说,有一些软件包(即ParallelDist)似乎可以满足您的所有需求(对于密集矩阵)。查看wordspace 进行快速余弦距离计算。请参阅here 进行一些比较。余弦距离很容易在 R 中有效地计算,而无需使用 Rcpp 使用 crossprod 操作(请参阅qlcMatrix::cosSparse 源代码以获得算法灵感)。

【讨论】:

    猜你喜欢
    • 2012-06-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-05-18
    • 2022-01-15
    • 1970-01-01
    • 2015-06-11
    • 2021-08-04
    相关资源
    最近更新 更多