【问题标题】:Transpose DataFrame single row to column in Spark with scala使用scala将DataFrame单行转换为Spark中的列
【发布时间】:2020-08-11 15:21:31
【问题描述】:

我在这里看到了这个问题: Transpose DataFrame Without Aggregation in Spark with scala 而我想做的恰恰相反。

我有一个单行数据框,其值为字符串、整数、布尔值、数组:

+-----+-------+-----+------+-----+
|col1 | col2  |col3 | col4 |col5 |
+-----+-------+-----+------+-----+
|val1 | val2  |val3 | val4 |val5 |
+-----+-------+-----+------+-----+

我想像这样转置它:

+-----------+-------+
|Columns    | values|
+-----------+-------+
|col1       | val1  |
|col2       | val2  |
|col3       | val3  |
|col4       | val4  |
|col5       | val5  |
+-----------+-------+

我正在使用 Apache Spark 2.4.3 和 Scala 2.11

编辑:值可以是任何类型(int、double、bool、array),而不仅仅是字符串。

【问题讨论】:

    标签: scala dataframe apache-spark pyspark transpose


    【解决方案1】:

    不使用 arrays_zip(在 => Spark 2.4 中可用)的想法不同] 并得到以下...

    它将以更简单的方式适用于 Spark =>2.0(flatmapmapexplode 函数)...

    这里map 函数(用于与列)创建一个新的地图列。输入列必须分组为键值对。

    案例:数据中的字符串数据类型:

    import org.apache.spark.sql.functions._
    
    val df: DataFrame =Seq((("val1"),("val2"),("val3"),("val4"),("val5"))).toDF("col1","col2","col3","col4","col5")
    
    var columnsAndValues = df.columns.flatMap { c => Array(lit(c), col(c)) }
    df.printSchema()
    
    df.withColumn("myMap", map(columnsAndValues:_*)).select(explode($"myMap"))
      .toDF("Columns","Values").show(false)
    

    结果:

    root
     |-- col1: string (nullable = true)
     |-- col2: string (nullable = true)
     |-- col3: string (nullable = true)
     |-- col4: string (nullable = true)
     |-- col5: string (nullable = true)
    
    +-------+------+
    |Columns|Values|
    +-------+------+
    |col1   |val1  |
    |col2   |val2  |
    |col3   |val3  |
    |col4   |val4  |
    |col5   |val5  |
    +-------+------+
    

    案例:数据中数据类型的混合:

    如果您有不同的类型,请将它们转换为字符串...其余步骤不会改变..

    val df1 = df.select(df.columns.map(c => col(c).cast(StringType)): _*)
    

    完整示例:

    import org.apache.spark.sql.functions._
    import spark.implicits._
    import org.apache.spark.sql.Column
    
    val df = Seq(((2), (3), (true), (2.4), ("val"))).toDF("col1", "col2", "col3", "col4", "col5")
    df.printSchema()
    /**
      * convert all columns to  to string type since its needed further
      */
    val df1 = df.select(df.columns.map(c => col(c).cast(StringType)): _*)
    df1.printSchema()
    var ColumnsAndValues: Array[Column] = df.columns.flatMap { c => {
        Array(lit(c), col(c))
      }
    }
    
    df1.withColumn("myMap", map(ColumnsAndValues: _*))
       .select(explode($"myMap"))
       .toDF("Columns", "Values")
       .show(false)
    

    结果:

    root
     |-- col1: integer (nullable = false)
     |-- col2: integer (nullable = false)
     |-- col3: boolean (nullable = false)
     |-- col4: double (nullable = false)
     |-- col5: string (nullable = true)
    
    root
     |-- col1: string (nullable = false)
     |-- col2: string (nullable = false)
     |-- col3: string (nullable = false)
     |-- col4: string (nullable = false)
     |-- col5: string (nullable = true)
    
    +-------+------+
    |Columns|Values|
    +-------+------+
    |col1   |2     |
    |col2   |3     |
    |col3   |true  |
    |col4   |2.4   |
    |col5   |val   |
    +-------+------+
    

    【讨论】:

    • 嗯我忘了指定值的类型。实际上它们是混合的(不仅是字符串,它们可以是 int、float、bool 等),我得到了这个异常:org.apache.spark.sql.AnalysisException: cannot resolve 'map[...]' due to data type mismatch: The given values of function map should all be the same type
    • 你知道如何在 python 中做到这一点,即 pyspark 吗?
    【解决方案2】:

    来自 Spark-2.4 使用 arrays_ziparray(column_values), array(column_names) 然后分解得到结果。

    Example:

    val df=Seq((("val1"),("val2"),("val3"),("val4"),("val5"))).toDF("col1","col2","col3","col4","col5")
    
    val cols=df.columns.map(x => col(s"${x}"))
    
    val str_cols=df.columns.mkString(",")
    
    df.withColumn("new",explode(arrays_zip(array(cols:_*),split(lit(str_cols),",")))).
    select("new.*").
    toDF("values","Columns").
    show()
    //+------+-------+
    //|values|Columns|
    //+------+-------+
    //|  val1|   col1|
    //|  val2|   col2|
    //|  val3|   col3|
    //|  val4|   col4|
    //|  val5|   col5|
    //+------+-------+
    

    UPDATE:

    val df=Seq(((2),(3),(true),(2.4),("val"))).toDF("col1","col2","col3","col4","col5")
    
    df.printSchema
    //root
    // |-- col1: integer (nullable = false)
    // |-- col2: integer (nullable = false)
    // |-- col3: boolean (nullable = false)
    // |-- col4: double (nullable = false)
    // |-- col5: string (nullable = true)
    
    //cast to string
    val cols=df.columns.map(x => col(s"${x}").cast("string").alias(s"${x}"))
    
    val str_cols=df.columns.mkString(",")
    
    df.withColumn("new",explode(arrays_zip(array(cols:_*),split(lit(str_cols),",")))).
    select("new.*").
    toDF("values","Columns").
    show()
    
    //+------+-------+
    //|values|Columns|
    //+------+-------+
    //|     2|   col1|
    //|     3|   col2|
    //|  true|   col3|
    //|   2.4|   col4|
    //|   val|   col5|
    //+------+-------+
    

    【讨论】:

    • 如果 OP 使用较低版本的 spark 会怎样
    • @RamGhadiyaram,谢谢!,那么我们需要在 scala 中使用 udf 引用这篇文章 stackoverflow.com/a/41027619
    • @check my answer without udf
    • @Shu,可以使用SparkSQL函数栈,查看:stackoverflow.com/questions/42465568/…
    • 嗯我忘了指定值的类型。实际上它们是混合的(不仅是字符串,它们可以是 int、float、bool 等),我得到了这个异常:org.apache.spark.sql.AnalysisException: cannot resolve 'map[...]' due to data type mismatch: The given values of function map should all be the same type
    猜你喜欢
    • 2019-02-20
    • 2016-04-13
    • 2017-02-26
    • 1970-01-01
    • 2017-05-20
    • 1970-01-01
    • 2018-03-05
    • 2019-09-26
    • 1970-01-01
    相关资源
    最近更新 更多