【问题标题】:Spark Scala: How to convert Dataframe[vector] to DataFrame[f1:Double, ..., fn: Double)]Spark Scala:如何将 Dataframe[vector] 转换为 DataFrame[f1:Double, ..., fn: Double)]
【发布时间】:2016-11-01 18:40:02
【问题描述】:

我刚刚使用 Standard Scaler 为 ML 应用程序标准化我的功能。选择缩放特征后,我想将其转换回双精度数据帧,尽管我的向量的长度是任意的。我知道如何使用

来实现特定的 3 个功能
myDF.map{case Row(v: Vector) => (v(0), v(1), v(2))}.toDF("f1", "f2", "f3")

但不适用于任意数量的功能。有没有简单的方法可以做到这一点?

例子:

val testDF = sc.parallelize(List(Vectors.dense(5D, 6D, 7D), Vectors.dense(8D, 9D, 10D), Vectors.dense(11D, 12D, 13D))).map(Tuple1(_)).toDF("scaledFeatures")
val myColumnNames = List("f1", "f2", "f3")
// val finalDF = DataFrame[f1: Double, f2: Double, f3: Double] 

编辑

我在创建数据框时发现了如何解压为列名,但仍然无法将向量转换为创建数据框所需的序列:

finalDF = testDF.map{case Row(v: Vector) => v.toArray.toSeq /* <= this errors */}.toDF(List("f1", "f2", "f3"): _*)

【问题讨论】:

    标签: scala apache-spark apache-spark-sql apache-spark-ml


    【解决方案1】:

    由于上述答案需要额外的库或仍然不受支持,我使用 pandas 数据框轻松提取矢量值,然后将其转换回 spark 数据框。

    # convert to pandas dataframe 
    pandasDf = dataframe.toPandas()
    # add a new column
    pandasDf['newColumnName'] = 0 # filled the new column with 0s
    # now iterate through the rows and update the column
    for index, row in pandasDf.iterrows():
       value = row['vectorCol'][0] # get the 0th value of the vector
       pandasDf.loc[index, 'newColumnName'] = value # put the value in the new column
    

    【讨论】:

      【解决方案2】:

      我使用Spark 2.3.2,构建了xgboost4j二分类模型,结果如下:

      results_train.select("classIndex","probability","prediction").show(3,0)
      +----------+----------------------------------------+----------+
      |classIndex|probability                             |prediction|
      +----------+----------------------------------------+----------+
      |1         |[0.5998525619506836,0.400147408246994]  |0.0       |
      |1         |[0.5487841367721558,0.45121586322784424]|0.0       |
      |0         |[0.5555324554443359,0.44446757435798645]|0.0       |
      

      我定义了下面的udf来从向量列中取出元素概率

      import org.apache.spark.sql.functions._
      
      def getProb = udf((probV: org.apache.spark.ml.linalg.Vector, clsInx: Int) => probV.apply(clsInx) )
      
      results_train.select("classIndex","probability","prediction").
      withColumn("p_0",getProb($"probability",lit(0))).
      withColumn("p_1",getProb($"probability", lit(1))).show(3,0)
      
      +----------+----------------------------------------+----------+------------------+-------------------+
      |classIndex|probability                             |prediction|p_0               |p_1                |
      +----------+----------------------------------------+----------+------------------+-------------------+
      |1         |[0.5998525619506836,0.400147408246994]  |0.0       |0.5998525619506836|0.400147408246994  |
      |1         |[0.5487841367721558,0.45121586322784424]|0.0       |0.5487841367721558|0.45121586322784424|
      |0         |[0.5555324554443359,0.44446757435798645]|0.0       |0.5555324554443359|0.44446757435798645|
      

      希望这对那些处理 Vector 类型输入的人有所帮助。

      【讨论】:

        【解决方案3】:

        火花 >= 3.0.0

        从 Spark 3.0 开始你可以使用vector_to_array

        import org.apache.spark.ml.functions.vector_to_array
        
        testDF.select(vector_to_array($"scaledFeatures").alias("_tmp")).select(exprs:_*)
        

        火花

        一种可能的方法与此类似

        import org.apache.spark.sql.functions.udf
        
        // In Spark 1.x you'll will have to replace ML Vector with MLLib one
        // import org.apache.spark.mllib.linalg.Vector
        // In 2.x the below is usually the right choice
        import org.apache.spark.ml.linalg.Vector
        
        // Get size of the vector
        val n = testDF.first.getAs[Vector](0).size
        
        // Simple helper to convert vector to array<double> 
        // asNondeterministic is available in Spark 2.3 or befor
        // It can be removed, but at the cost of decreased performance
        val vecToSeq = udf((v: Vector) => v.toArray).asNondeterministic
        
        // Prepare a list of columns to create
        val exprs = (0 until n).map(i => $"_tmp".getItem(i).alias(s"f$i"))
        
        testDF.select(vecToSeq($"scaledFeatures").alias("_tmp")).select(exprs:_*)
        

        如果您预先知道列列表,则可以稍微简化一下:

        val cols: Seq[String] = ???
        val exprs = cols.zipWithIndex.map{ case (c, i) => $"_tmp".getItem(i).alias(c) }
        

        对于 Python 等效项,请参阅 How to split Vector into columns - using PySpark

        【讨论】:

          【解决方案4】:

          请尝试VectorSlicer

          import org.apache.spark.ml.feature.VectorAssembler
          import org.apache.spark.ml.linalg.Vectors
          
          val dataset = spark.createDataFrame(
            Seq((1, 0.2, 0.8), (2, 0.1, 0.9), (3, 0.3, 0.7))
          ).toDF("id", "negative_logit", "positive_logit")
          
          
          val assembler = new VectorAssembler()
            .setInputCols(Array("negative_logit", "positive_logit"))
            .setOutputCol("prediction")
          
          val output = assembler.transform(dataset)
          output.show()
          /*
          +---+--------------+--------------+----------+
          | id|negative_logit|positive_logit|prediction|
          +---+--------------+--------------+----------+
          |  1|           0.2|           0.8| [0.2,0.8]|
          |  2|           0.1|           0.9| [0.1,0.9]|
          |  3|           0.3|           0.7| [0.3,0.7]|
          +---+--------------+--------------+----------+
          */
          
          val slicer = new VectorSlicer()
          .setInputCol("prediction")
          .setIndices(Array(1))
          .setOutputCol("positive_prediction")
          
          val posi_output = slicer.transform(output)
          posi_output.show()
          
          /*
          +---+--------------+--------------+----------+-------------------+
          | id|negative_logit|positive_logit|prediction|positive_prediction|
          +---+--------------+--------------+----------+-------------------+
          |  1|           0.2|           0.8| [0.2,0.8]|              [0.8]|
          |  2|           0.1|           0.9| [0.1,0.9]|              [0.9]|
          |  3|           0.3|           0.7| [0.3,0.7]|              [0.7]|
          +---+--------------+--------------+----------+-------------------+
          */
          

          【讨论】:

            【解决方案5】:

            几天前出现的替代解决方案:现在将VectorDisassembler 导入您的项目(只要它没有合并到 Spark 中):

            import org.apache.spark.ml.feature.VectorAssembler
            import org.apache.spark.ml.linalg.Vectors
            
            val dataset = spark.createDataFrame(
              Seq((0, 1.2, 1.3), (1, 2.2, 2.3), (2, 3.2, 3.3))
            ).toDF("id", "val1", "val2")
            
            
            val assembler = new VectorAssembler()
              .setInputCols(Array("val1", "val2"))
              .setOutputCol("vectorCol")
            
            val output = assembler.transform(dataset)
            output.show()
            /*
            +---+----+----+---------+
            | id|val1|val2|vectorCol|
            +---+----+----+---------+
            |  0| 1.2| 1.3|[1.2,1.3]|
            |  1| 2.2| 2.3|[2.2,2.3]|
            |  2| 3.2| 3.3|[3.2,3.3]|
            +---+----+----+---------+*/
            
            val disassembler = new org.apache.spark.ml.feature.VectorDisassembler()
              .setInputCol("vectorCol")
            disassembler.transform(output).show()
            /*
            +---+----+----+---------+----+----+
            | id|val1|val2|vectorCol|val1|val2|
            +---+----+----+---------+----+----+
            |  0| 1.2| 1.3|[1.2,1.3]| 1.2| 1.3|
            |  1| 2.2| 2.3|[2.2,2.3]| 2.2| 2.3|
            |  2| 3.2| 3.3|[3.2,3.3]| 3.2| 3.3|
            +---+----+----+---------+----+----+*/
            

            【讨论】:

            • VectorDisassembler 从未接触过 Spark (SPARK-13610)。
            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2017-01-22
            • 1970-01-01
            • 2017-06-02
            • 2015-02-04
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多