【问题标题】:Matrix Transpose on RowMatrix in SparkSpark中RowMatrix上的矩阵转置
【发布时间】:2015-05-31 10:43:44
【问题描述】:

假设我有一个 RowMatrix。

  1. 如何转置它。 API 文档似乎没有转置方法。
  2. Matrix 具有 transpose() 方法。但它不是分布式的。如果我有一个比内存更大的矩阵,我该如何转置它?
  3. 我已将 RowMatrix 转换为 DenseMatrix,如下所示

    DenseMatrix Mat = new DenseMatrix(m,n,MatArr);
    

    这需要将 RowMatrix 转换为 JavaRDD 并将 JavaRDD 转换为数组。

还有其他方便的转换方式吗?

提前致谢

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    如果有人感兴趣,我已经实现了@javadba 提出的分布式版本。

      def transposeRowMatrix(m: RowMatrix): RowMatrix = {
        val transposedRowsRDD = m.rows.zipWithIndex.map{case (row, rowIndex) => rowToTransposedTriplet(row, rowIndex)}
          .flatMap(x => x) // now we have triplets (newRowIndex, (newColIndex, value))
          .groupByKey
          .sortByKey().map(_._2) // sort rows and remove row indexes
          .map(buildRow) // restore order of elements in each row and remove column indexes
        new RowMatrix(transposedRowsRDD)
      }
    
    
      def rowToTransposedTriplet(row: Vector, rowIndex: Long): Array[(Long, (Long, Double))] = {
        val indexedRow = row.toArray.zipWithIndex
        indexedRow.map{case (value, colIndex) => (colIndex.toLong, (rowIndex, value))}
      }
    
      def buildRow(rowWithIndexes: Iterable[(Long, Double)]): Vector = {
        val resArr = new Array[Double](rowWithIndexes.size)
        rowWithIndexes.foreach{case (index, value) =>
            resArr(index.toInt) = value
        }
        Vectors.dense(resArr)
      } 
    

    【讨论】:

    • 你能为此分享 PySpark 版本吗?
    • 对不起,不 :( 我很久以前写的,有一段时间没接触 Spark。适应它应该不难。
    【解决方案2】:

    您可以使用可以从 IndexedRowMatrix 创建的 BlockMatrix:

    BlockMatrix matA = (new IndexedRowMatrix(...).toBlockMatrix().cache();
    matA.validate();
    
    BlockMatrix matB = matA.transpose();
    

    然后,可以很容易地放回IndexedRowMatrix。这在spark documentation 中有描述。

    【讨论】:

      【解决方案3】:

      你是对的:没有

       RowMatrix.transpose()
      

      方法。您将需要手动执行此操作。

      这是非分布式/本地矩阵版本:

      def transpose(m: Array[Array[Double]]): Array[Array[Double]] = {
          (for {
            c <- m(0).indices
          } yield m.map(_(c)) ).toArray
      }
      

      分发版将遵循以下几行:

          origMatRdd.rows.zipWithIndex.map{ case (rvect, i) =>
              rvect.zipWithIndex.map{ case (ax, j) => ((j,(i,ax))
          }.groupByKey
          .sortBy{ case (i, ax) => i }
          .foldByKey(new DenseVector(origMatRdd.numRows())) { case (dv, (ix,ax))  =>
                    dv(ix) = ax
           }
      

      警告:我没有测试过上面的内容:它有错误。但基本方法是有效的 - 并且类似于我过去为 spark 的小型 LinAlg 库所做的工作。

      【讨论】:

      • 这意味着分布式矩阵转置是一个悬而未决的问题?
      • AFAIK 是的,就是这样。例如。 4 月份向 Spark 用户列表发送的关于大型矩阵转置的电子邮件没有收到任何回复。
      • 进一步检查:有开放的 JIRA 用于此和相关的 RowSimilarity 和 BlockMatrix 操作。
      【解决方案4】:

      对于非常大且稀疏的矩阵,(例如从文本特征提取中获得的矩阵),最好和最简单的方法是:

      def transposeRowMatrix(m: RowMatrix): RowMatrix = {
        val indexedRM = new IndexedRowMatrix(m.rows.zipWithIndex.map({
          case (row, idx) => new IndexedRow(idx, row)}))
        val transposed = indexedRM.toCoordinateMatrix().transpose.toIndexedRowMatrix()
        new RowMatrix(transposed.rows
          .map(idxRow => (idxRow.index, idxRow.vector))
          .sortByKey().map(_._2))      
      }
      

      对于不那么稀疏的矩阵,您可以使用 BlockMatrix 作为上面 aletapool 回答中提到的桥梁。

      但是 aletapool 的回答错过了一个非常重要的点:当你从 RowMaxtrix -> IndexedRowMatrix -> BlockMatrix -> transpose -> BlockMatrix -> IndexedRowMatrix -> RowMatrix 开始,在最后一步(IndexedRowMatrix -> RowMatrix),你必须做一个排序。因为默认情况下,从 IndexedRowMatrix 转换为 RowMatrix,索引会被简单地丢弃,顺序会被打乱。

      val data = Array(
        MllibVectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
        MllibVectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
        MllibVectors.dense(4.0, 0.0, 0.0, 6.0, 7.0),
        MllibVectors.sparse(5, Seq((2, 2.0), (3, 7.0))))
      
      val dataRDD = sc.parallelize(data, 4)
      
      val testMat: RowMatrix = new RowMatrix(dataRDD)
      testMat.rows.collect().map(_.toDense).foreach(println)
      
      [0.0,1.0,0.0,7.0,0.0]
      [2.0,0.0,3.0,4.0,5.0]
      [4.0,0.0,0.0,6.0,7.0]
      [0.0,0.0,2.0,7.0,0.0]
      
      transposeRowMatrix(testMat).
        rows.collect().map(_.toDense).foreach(println)
      
      [0.0,2.0,4.0,0.0]
      [1.0,0.0,0.0,0.0]
      [0.0,3.0,0.0,2.0]
      [7.0,4.0,6.0,7.0]
      [0.0,5.0,7.0,0.0]
      

      【讨论】:

        【解决方案5】:

        在Java中获取RowMatrix的转置:

        public static RowMatrix transposeRM(JavaSparkContext jsc, RowMatrix mat){
        List<Vector> newList=new ArrayList<Vector>();
        List<Vector> vs = mat.rows().toJavaRDD().collect();
        double [][] tmp=new double[(int)mat.numCols()][(int)mat.numRows()] ;
        
        for(int i=0; i < vs.size(); i++){
            double[] rr=vs.get(i).toArray();
            for(int j=0; j < mat.numCols(); j++){
                tmp[j][i]=rr[j];
            }
        }
        
        for(int i=0; i < mat.numCols();i++)
            newList.add(Vectors.dense(tmp[i]));
        
        JavaRDD<Vector> rows2 = jsc.parallelize(newList);
        RowMatrix newmat = new RowMatrix(rows2.rdd());
        return (newmat);
        }
        

        【讨论】:

          【解决方案6】:

          这是先前解决方案的变体,但适用于稀疏行矩阵并在需要时也保持转置稀疏:

            def transpose(X: RowMatrix): RowMatrix = {
              val m = X.numRows ().toInt
              val n = X.numCols ().toInt
          
              val transposed = X.rows.zipWithIndex.flatMap {
                case (sp: SparseVector, i: Long) => sp.indices.zip (sp.values).map {case (j, value) => (i, j, value)}
                case (dp: DenseVector, i: Long) => Range (0, n).toArray.zip (dp.values).map {case (j, value) => (i, j, value)}
              }.sortBy (t => t._1).groupBy (t => t._2).map {case (i, g) =>
                val (indices, values) = g.map {case (i, j, value) => (i.toInt, value)}.unzip
                if (indices.size == m) {
                  (i, Vectors.dense (values.toArray) )
                } else {
                  (i, Vectors.sparse (m, indices.toArray, values.toArray))
                }
              }.sortBy(t => t._1).map (t => t._2)
          
              new RowMatrix (transposed)
            }
          

          希望对您有所帮助!

          【讨论】:

            猜你喜欢
            • 2019-07-05
            • 1970-01-01
            • 2012-01-29
            • 1970-01-01
            • 2012-05-08
            • 2010-11-13
            • 2011-06-23
            • 1970-01-01
            相关资源
            最近更新 更多