【问题标题】:Parallellizable Algorithms to traverse a 2D matrix being aware of both col/row-wise neighborhood遍历二维矩阵的并行化算法,同时知道 col/row-wise 邻域
【发布时间】:2018-05-01 02:45:42
【问题描述】:

我有一个相当大的 N*N 整数矩阵 Matrix2D(假设内存足够),

1,在每个 row/column 中,如果元素的值不同于它是右/下邻居。

2,我想找到一个可并行化的最优算法,最好是通过 OMP。

所以,最后我会有一些数据结构,例如,

std::vector<std::vector<int>>   RowWiseDiscontinuity(N);// N= #of rows
std::vector<std::vector<int>>   ColWiseDiscontinuity(N);// N= #of cols

inner std::vector&lt;int&gt; 记录行/列索引。

我把我的串行版本放在这里,但发现很难被 OMP 并行化......有人可以提供一些想法如何使用 omp 实现对这个 2D 矩阵的遍历?

代码 sn-p,

std::vector<std::vector<int>>   RowWiseDiscontinuity(N);// N= #of rows
std::vector<std::vector<int>>   ColWiseDiscontinuity(N);// N= #of cols
std::vector<int> TempX1;
std::vector<int> TempX2;
for (int y=0; y<N; ++y)
{
    TempX1.clear();
    for (int x =0; x<N; ++x)
    {
        int value = Matrix2D(x,y);
        TempX1.push_back(value);
    }

    auto iter1 = TempX1.begin();
    auto iter2 = TempX2.begin();

    if (y>0)
    for (int x =0; x<N; ++x)
    {
         if (*iter1 !=*(iter1+1))
         {
             RowWiseDiscontinuity[y].push_back(x); //Critical for OMP
         }
         ++iter1;
         ++iter2;
         if (*iter1 != *iter2)
         {
             ColWiseDiscontinuity[x].push_back(y); //Critical for OMP
         }
     }

     TempX2.swap(TempX1); // proceed to next row, remember previous

}

【问题讨论】:

  • 您是否考虑过将 (x,y) 或 (y,x) 对存储在线程局部向量中,并在内循环完成后将它们添加到 Row/ColWiseDiscontinuity 中?这样,您只需使用少量的额外存储空间,并且可以按顺序插入 Row/ColWiseDiscontinuity,而不会对性能产生太大影响(我假设不连续性只会很少发生)
  • 您是否尝试过使用步幅值将2D vector-matrix 扁平化为1D vector,以标记一行有多少列?
  • 可以删除TempX1TempX2,直接使用Matrix2D的数据。然后 omp 希望可以并行化外循环。但是,如果 Matrix2D 不适合缓存,那么您可能会受到内存带宽的限制,因此您可能不会获得显着的加速。
  • 在担心它的性能或并行化它之前让你的算法工作可能是明智的。在测试*iter1 != *(iter1 + 1) 中,iter1 + 1 可以是结束迭代器,因此测试给出了未定义的行为。在最新的测试中*iter1 != *iter2(直接在递增之后)iter1iter2 都可以是结束迭代器。

标签: c++ algorithm matrix data-structures openmp


【解决方案1】:

在矩阵上执行两次(可以在不同线程上执行),一次用于行不连续,另一次用于列不连续。

行通道如下所示:

for (int y = 0; y < N; ++y)    // Can be parallelized
{
    for (int x = 0; x < N - 1; ++x)
    {
        if(Matrix(x, y) != Matrix(x + 1, y))
            RowWiseDiscontinuity[y].push_back(x);
    }
}

列传类似:

for (int x = 0; x < N; ++x)    // Can be parallelized
{
    for (int y = 0; y < N - 1; ++y)
    {
        if(Matrix(x, y) != Matrix(x, y + 1))
            ColWiseDiscontinuity[x].push_back(y);
    }
}

两种情况下的外循环都可以并行化。 Row / ColWiseDiscontinuity不同 元素在外循环的每次迭代中都会发生变化,从而防止数据竞争。传递本身可以在不同的线程上执行。

附带说明一下,您可以通过以 行优先和列优先顺序存储矩阵并使用每个在适当的时候订购。在行优先顺序中,元素 (x + 1, y) 始终位于 (x, y) 旁边。对于以列为主的元素(x, y + 1) 也是如此。

【讨论】:

  • 我没有得到无法解释的反对票,也没有看到我的解决方案有什么问题。
【解决方案2】:

这是一种算法,它执行查找相邻对角邻居的基本测试,并使用 4x4 单位矩阵记录结果。这不包括对 OMP 或并行计算的任何使用。然而,这是一个简单易用的 MxN 矩阵的通用类模板。而不是将内容存储在向量的向量中;我已经将数据展平为单个一维向量,并且在模板实例化时已经保留了内存量。我正在使用函数模板来比较矩阵中的元素,并返回索引(M,N)(x,y),以及结果是真还是假。我在这里使用一个结构来包含 x-y 索引和布尔结果的关系。检查邻居的启发式方法避免了查看矩阵的最后一列和最后一行,因为不会有任何元素位于右侧或下方:这可以在 main 函数中看到。这可能有助于您尝试将类、结构和函数应用于 OMP 库。

template<unsigned Col, unsigned Row>
class Matrix2D {
public:
    const unsigned col_size = Col;
    const unsigned row_size = Row;
    const unsigned stride_ = col_size;
    const unsigned matrix_size = col_size * row_size;

private:
    std::vector<int> data_;

public:
    Matrix2D() {
        data_.resize( matrix_size );
    }

    void addElement( unsigned x, unsigned y, int val ) {
        data_[(x * col_size + y)] = val;
    }

    /*int getElement( unsigned x, unsigned y ) {
        int value = data_[(x * col_size + y)];
        return value;
    }*/

    int getElement( unsigned idx ) {
        return data_[idx];
    }
};

struct Neighbor {
    unsigned indexCol;
    unsigned indexRow;
    bool     notSame;
};


template<unsigned Col, unsigned Row>
void compareMatrixDiagonals( Matrix2D<Col, Row>& mat, Neighbor& n, unsigned colIdx, unsigned rowIdx );

int main() {

    Matrix2D<4, 4> mat4x4;
    mat4x4.addElement( 0, 0, 1 );
    mat4x4.addElement( 0, 1, 0 );
    mat4x4.addElement( 0, 2, 0 );
    mat4x4.addElement( 0, 3, 0 );

    mat4x4.addElement( 1, 0, 0 );
    mat4x4.addElement( 1, 1, 1 );
    mat4x4.addElement( 1, 2, 0 );
    mat4x4.addElement( 1, 3, 0 );

    mat4x4.addElement( 2, 0, 0 );
    mat4x4.addElement( 2, 1, 0 );
    mat4x4.addElement( 2, 2, 1 );
    mat4x4.addElement( 2, 3, 0 );

    mat4x4.addElement( 3, 0, 0 );
    mat4x4.addElement( 3, 1, 0 );
    mat4x4.addElement( 3, 2, 0 );
    mat4x4.addElement( 3, 3, 1 );

    unsigned idx = 0;
    for ( unsigned i = 0; i < mat4x4.matrix_size; i++ ) {
        std::cout << mat4x4.getElement( i ) << " ";
        idx++;

        if ( idx == 4 ) {
            std::cout << "\n";
            idx = 0;
        }
    }
    std::cout << "\n";    

    unsigned colIdx = 0;
    unsigned rowIdx = 0;
    std::vector<Neighbor> neighbors;
    Neighbor n;

    // If we are in the last col or row we can ignore
    // (0,3),(1,3),(2,3),(3,3),(3,0),(3,1),(3,2), {*(3,3)* already excluded}
    // This is with a 4x4 matrix: we can substitute and use LastCol - LastRow 
    // for any size MxN Matrix.
    const unsigned LastCol = mat4x4.col_size - 1;
    const unsigned LastRow = mat4x4.row_size - 1;

    for ( unsigned i = 0; i < LastCol; i++ ) {
        for ( unsigned j = 0; j < LastRow; j++ ) {
            compareMatrixDiagonals( mat4x4, n, i, j );
            neighbors.push_back( n );
        }
    }

    for ( unsigned i = 0; i < neighbors.size(); i++ ) {
        std::cout << "(" << neighbors[i].indexCol
            << "," << neighbors[i].indexRow
            << ") " << neighbors[i].notSame
            << "\n";
    }

    std::cout << "\nPress any key & enter to quit." << std::endl;
    char c;
    std::cin >> c;

    return 0;
}

template<unsigned Col, unsigned Row>
void compareMatrixDiagonals( Matrix2D<Col, Row>& mat, Neighbor& N, unsigned colIdx, unsigned rowIdx ) {
    unsigned firstIdx = (colIdx * mat.col_size + rowIdx);
    unsigned nextIdx  = ((colIdx + 1) * mat.col_size +  (rowIdx + 1));
    if ( mat.getElement( firstIdx ) != mat.getElement( nextIdx ) ) {
        N.indexCol = colIdx;
        N.indexRow = rowIdx;
        N.notSame  = true;          
    } else {
        N.indexCol = colIdx;
        N.indexRow = rowIdx;
        N.notSame  = false;     
    }
}

【讨论】:

    【解决方案3】:

    我会创建另一个数组来保存最近的邻居列和行。显然,这必须作为第一次通过。我建议创建一个包含所需索引的二维数组对(pair)。我会做一个成对的向量,而不是两个向量。对是可并行化的并且易于排序。

    vector<vector<pair<int, int>>> elements(N);
    

    【讨论】:

    • 我不完全确定这是否是你想要的,所以请在投反对票之前纠正我,谢谢。
    猜你喜欢
    • 2012-03-15
    • 1970-01-01
    • 1970-01-01
    • 2017-12-29
    • 1970-01-01
    • 1970-01-01
    • 2011-02-21
    • 2021-12-28
    • 2012-12-30
    相关资源
    最近更新 更多