【问题标题】:Spark sql how to explode without losing null valuesSpark sql如何在不丢失空值的情况下爆炸
【发布时间】:2017-02-05 22:27:55
【问题描述】:

我有一个要展平的数据框。作为该过程的一部分,我想将其分解,因此如果我有一列数组,则数组的每个值都将用于创建单独的行。例如,

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]

应该变成

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer

这是我的代码

private DataFrame explodeDataFrame(DataFrame df) {
    DataFrame resultDf = df;
    for (StructField field : df.schema().fields()) {
        if (field.dataType() instanceof ArrayType) {
            resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
            resultDf.show();
        }
    }
    return resultDf;
}

问题是在我的数据中,一些数组列有空值。在这种情况下,将删除整行。所以这个数据框:

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]
2  | Lucy | null

变成

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer

而不是

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer
2  | Lucy | null

如何分解我的数组,以免丢失空行?

我正在使用 Spark 1.5.2 和 Java 8

【问题讨论】:

    标签: java apache-spark null apache-spark-sql


    【解决方案1】:

    Spark 2.2+

    你可以使用explode_outer函数:

    import org.apache.spark.sql.functions.explode_outer
    
    df.withColumn("likes", explode_outer($"likes")).show
    
    // +---+----+--------+
    // | id|name|   likes|
    // +---+----+--------+
    // |  1|Luke|baseball|
    // |  1|Luke|  soccer|
    // |  2|Lucy|    null|
    // +---+----+--------+
    

    火花

    在 Scala 中,但 Java 等效项应该几乎相同(要导入单个函数,请使用 import static)。

    import org.apache.spark.sql.functions.{array, col, explode, lit, when}
    
    val df = Seq(
      (1, "Luke", Some(Array("baseball", "soccer"))),
      (2, "Lucy", None)
    ).toDF("id", "name", "likes")
    
    df.withColumn("likes", explode(
      when(col("likes").isNotNull, col("likes"))
        // If null explode an array<string> with a single null
        .otherwise(array(lit(null).cast("string")))))
    

    这里的想法基本上是将NULL 替换为所需类型的array(NULL)。对于复杂类型(a.k.a structs),您必须提供完整的架构:

    val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")
    
    val st =  StructType(Seq(
      StructField("_1", IntegerType, false), StructField("_2", StringType, true)
    ))
    
    dfStruct.withColumn("y", explode(
      when(col("y").isNotNull, col("y"))
        .otherwise(array(lit(null).cast(st)))))
    

    dfStruct.withColumn("y", explode(
      when(col("y").isNotNull, col("y"))
        .otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))
    

    注意

    如果已创建数组 Column 并将 containsNull 设置为 false 您应该首先更改它(使用 Spark 2.1 测试):

    df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))
    

    【讨论】:

    • 看起来不错,谢谢!我有一个后续问题:如果我的列类型是 StructType 怎么办?我尝试使用 cast(new StructType()),但我得到了data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type; 我试图使我的方法尽可能通用,因此它适合所有列类型。
    • 另外,为了获取列类型,我使用了 DataFrame.dtypes()。有没有更好的方法来获取列类型?
    • a) 您必须提供包含所有字段的完整架构。 b) dtypesschema
    • coalesce 而不是 case-when 更简洁,应该像魅力一样工作
    • @hamed 纠正小的语法差异几乎是相同的(比如isNotNull()而不是isNotNull)。
    【解决方案2】:

    你可以使用explode_outer()函数。

    【讨论】:

      【解决方案3】:

      根据公认的答案,当数组元素是复杂类型时,可能很难手动定义它(例如,使用大型结构)。

      为了自动完成,我编写了以下辅助方法:

        def explodeOuter(df: Dataset[Row], columnsToExplode: List[String]) = {
            val arrayFields = df.schema.fields
                .map(field => field.name -> field.dataType)
                .collect { case (name: String, type: ArrayType) => (name, type.asInstanceOf[ArrayType])}
                .toMap
      
            columnsToExplode.foldLeft(df) { (dataFrame, arrayCol) =>
            dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol))
              .otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType)))))    
       }
      

      编辑:似乎 spark 2.2 及更新版本已内置此功能。

      【讨论】:

      • 这个def explodeOuter 在代码中如何使用?
      • 您必须传递要爆炸的数据框和列。 explodeOuter(df, List("array_column"))
      【解决方案4】:

      处理空地图类型列:对于 Spark

       List((1, Array(2, 3, 4), Map(1 -> "a")),
      (2, Array(5, 6, 7), Map(2 -> "b")),
      (3, Array[Int](), Map[Int, String]())).toDF("col1", "col2", "col3").show()
      
      
       df.select('col1, explode(when(size(map_keys('col3)) === 0, map(lit("null"), lit("null"))).
      otherwise('col3))).show()
      

      【讨论】:

        【解决方案5】:
        from pyspark.sql.functions import *
        
        def flatten_df(nested_df):
            flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
            nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
            flat_df = nested_df.select(flat_cols +
                                       [col(nc + '.' + c).alias(nc + '_' + c)
                                        for nc in nested_cols
                                        for c in nested_df.select(nc + '.*').columns])
            print("flatten_df_count :", flat_df.count())
            return flat_df
        
        def explode_df(nested_df):
            flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct' and c[1][:5] != 'array']
            array_cols = [c[0] for c in nested_df.dtypes if c[1][:5] == 'array']
            for array_col in array_cols:
                schema = new_df.select(array_col).dtypes[0][1]
                nested_df = nested_df.withColumn(array_col, when(col(array_col).isNotNull(), col(array_col)).otherwise(array(lit(None)).cast(schema))) 
            nested_df = nested_df.withColumn("tmp", arrays_zip(*array_cols)).withColumn("tmp", explode("tmp")).select([col("tmp."+c).alias(c) for c in array_cols] + flat_cols)
            print("explode_dfs_count :", nested_df.count())
            return nested_df
        
        
        new_df = flatten_df(myDf)
        while True:
            array_cols = [c[0] for c in new_df.dtypes if c[1][:5] == 'array']
            if len(array_cols):
                new_df = flatten_df(explode_df(new_df))
            else:
                break
            
        new_df.printSchema()
        
        

        使用arrays_zipexplode 更快地完成并解决null 问题。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2021-02-16
          • 2021-07-21
          • 1970-01-01
          • 2022-12-29
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多