【问题标题】:Spark: Replicate each row but with change in one column valueSpark:复制每一行但改变一列值
【发布时间】:2020-12-07 02:11:33
【问题描述】:

如何在spark中进行如下操作,

Initially:
+-----------+-----+------+
|date       |col1 | col2 |
+-----------+-----+------+
|2020-08-16 | 2   | abc  |
|2020-08-17 | 3   | def  |
|2020-08-18 | 4   | ghi  |
|2020-08-19 | 5   | jkl  |
|2020-08-20 | 6   | mno  |
+-----------+-----+------+

Final result:
+-----------+-----+------+
|date       |col1 | col2 |
+-----------+-----+------+
|2020-08-16 | 2   | abc  |
|2020-08-15 | 2   | abc  |
|2020-08-17 | 3   | def  |
|2020-08-16 | 3   | def  |
|2020-08-18 | 4   | ghi  |
|2020-08-17 | 4   | ghi  |
|2020-08-19 | 5   | jkl  |
|2020-08-18 | 5   | jkl  |
|2020-08-20 | 6   | mno  |
|2020-08-19 | 6   | mno  |
+-----------+-----+------+

因此,本质上需要复制每一行并更改其中一个列值,即对于每一行,将日期列复制为当前值的负 1 天。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql rdd


    【解决方案1】:

    尝试使用 date_add 函数,然后创建包含日期列和 date-1 列的数组,最后分解列。

    Example:

    df.show()
    
    /*
    +----------+----+----+
    |      date|col1|col2|
    +----------+----+----+
    |2020-08-16|   2| abc|
    |2020-08-17|   3| def|
    +----------+----+----+
    */
    
    import org.apache.spark.sql.functions._
    
    df.withColumn("new_date",array(col("date"),date_add(col("date"),-1))).
    drop("date").
    selectExpr("explode(new_date) as date","*").
    drop("new_date").
    show(10,false)
    
    /*
    +----------+----+----+
    |date      |col1|col2|
    +----------+----+----+
    |2020-08-16|2   |abc |
    |2020-08-15|2   |abc |
    |2020-08-17|3   |def |
    |2020-08-16|3   |def |
    +----------+----+----+
    */
    

    【讨论】:

      【解决方案2】:

      我在想union 对于这个解决方案来说会很优雅,例如

      // Union the two dataframes together, take 1 day away from the date
      df.union(df.select(date_add($"date", -1), $"col1", $"col2"))
      

      我在其中创建测试数据的完整示例脚本:

      import org.apache.spark.sql.functions._
      
      val dfOriginal = Seq(("2020-08-16", 2, "abc"), ("2020-08-17", 3, "def"), ("2020-08-18", 4, "ghi"), ("2020-08-19", 5, "jkl"), ("2020-08-20", 6, "mno"))
        .toDF("date", "col1", "col2")
      
      val df = dfOriginal
        .select (to_date($"date", "yyyy-MM-dd").as("date"), $"col1", $"col2")
      
      // Union the two dataframes together, take 1 day away from the date
      df.union(df.select(date_add($"date", -1), $"col1", $"col2"))
        .orderBy("date", "col1", "col2")
        .show
      

      我的结果:

      【讨论】:

        【解决方案3】:

        这可能有点晚了,但是在 python 上回答这个问题,这样其他人可能会觉得它很有用。

        from pyspark.sql.functions import *
        

        初始 DF 如下所示:

        +-----------+-----+------+
        |date       |col1 | col2 |
        +-----------+-----+------+
        |2020-08-16 | 2   | abc  |
        |2020-08-17 | 3   | def  |
        |2020-08-18 | 4   | ghi  |
        |2020-08-19 | 5   | jkl  |
        |2020-08-20 | 6   | mno  |
        +-----------+-----+------+
        
        df.withColumn("dates_array",array(col("date"),date_add(col("date"),-1))))
        .drop("date")
        .withColumn("date",explode("dates_array"))
        .drop("dates_array")
        .show()
        

        然后你会得到你想要的:

        +-----------+-----+------+
        |date       |col1 | col2 |
        +-----------+-----+------+
        |2020-08-16 | 2   | abc  |
        |2020-08-15 | 2   | abc  |
        |2020-08-17 | 3   | def  |
        |2020-08-16 | 3   | def  |
        |2020-08-18 | 4   | ghi  |
        |2020-08-17 | 4   | ghi  |
        |2020-08-19 | 5   | jkl  |
        |2020-08-18 | 5   | jkl  |
        |2020-08-20 | 6   | mno  |
        |2020-08-19 | 6   | mno  |
        +-----------+-----+------+
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2021-11-09
          • 1970-01-01
          • 2022-01-16
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多