【问题标题】:Is it possible to ignore null values when using lead window function in Spark在 Spark 中使用前导窗口函数时是否可以忽略空值
【发布时间】:2021-02-20 10:09:57
【问题描述】:

我的数据框是这样的

id  value  date    
1   100    2017 
1   null   2016 
1   20     2015 
1   100    2014

我想获取最近的先前值,但忽略 null

id  value  date   recent value
1   100    2017    20
1   null   2016    20
1   20     2015   100
1   100    2014   null

在使用前导窗口函数时,有什么方法可以忽略空值。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    在Spark中使用前导窗口函数时是否可以忽略空值

    不是。

    我想获取最新的值,但忽略 null

    只需将last(或first)与ignoreNulls一起使用:

    def last(columnName: String, ignoreNulls: Boolean): Column

    聚合函数:返回组中列的最后一个值。

    该函数默认返回它看到的最后一个值。当 ignoreNulls 设置为 true 时,它​​将返回它看到的最后一个非空值。如果所有值都为 null,则返回 null。

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions._
    
    val df = Seq(
      (1, Some(100), 2017), (1, None, 2016), (1, Some(20), 2015), 
      (1, Some(100), 2014)
    ).toDF("id", "value", "date")
    
    df.withColumn(
      "last_value",
       last("value", true).over(Window.partitionBy("id").orderBy("date"))
    ).show
    
    +---+-----+----+----------+                                                     
    | id|value|date|last_value|
    +---+-----+----+----------+
    |  1|  100|2014|       100|
    |  1|   20|2015|        20|
    |  1| null|2016|        20|
    |  1|  100|2017|       100|
    +---+-----+----+----------+
    

    【讨论】:

    • 我喜欢获取最新的先前值。我也更新了帖子。对不起,我之前不清楚
    • 如果“最近的”不包括现在怎么办
    • 回答我自己的问题:在这种情况下,您需要使用(在 SQL 中)over(partition by id order by date rows between unbounded preceding and 1 preceding)。不知道如何将其翻译成 Scala
    • 再次跟进:看这个问题stackoverflow.com/questions/36019847/…,它展示了如何使用rangeBetween
    【解决方案2】:

    您可以分两步完成:

    1. 创建具有非空值的表
    2. 加入原始表
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions._
    
    val df = Seq(
      (1, Some(100), 2017),
      (1, None, 2016),
      (1, Some(20), 2015),
      (1, Some(100), 2014)
    ).toDF("id", "value", "date")
    
    // Step 1
    val filledDf = df
      .where($"value".isNotNull)
      .withColumnRenamed("value", "recent_value")
    
    // Step 2
    val window: WindowSpec = Window.partitionBy("l.id", "l.date").orderBy($"r.date".desc)
    
    val finalDf = df.as("l")
      .join(filledDf.as("r"), $"l.id" === $"r.id" && $"l.date" > $"r.date", "left")
      .withColumn("rn", row_number().over(window))
      .where($"rn" === 1)
      .select("l.id", "l.date", "value", "recent_value")
    
    finalDf.orderBy($"date".desc).show
    
    +---+----+-----+------------+
    | id|date|value|recent_value|
    +---+----+-----+------------+
    |  1|2017|  100|          20|
    |  1|2016| null|          20|
    |  1|2015|   20|         100|
    |  1|2014|  100|        null|
    +---+----+-----+------------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-07-05
      • 1970-01-01
      • 1970-01-01
      • 2016-07-10
      • 2015-01-17
      • 1970-01-01
      相关资源
      最近更新 更多