【问题标题】:Update Spark Dataframe's window function row_number column for Delta Data更新 Delta 数据的 Spark Dataframe 窗口函数 row_number 列
【发布时间】:2018-08-31 17:48:27
【问题描述】:

我需要为增量数据更新数据帧的行号列。我已经实现了基本负载的行号如下:

输入数据:

val base = List(List("001", "a", "abc"), List("001", "a", "123"),List("003", "c", "456") ,List("002", "b", "dfr"), List("003", "c", "ytr"))
  .map(row => (row(0), row(1), row(2)))

val DS1 = base.toDF("KEY1", "KEY2" ,"VAL")

DS1.show()

+----+----+---+
|KEY1|KEY2|VAL|
+----+----+---+
| 001|   a|abc|
| 001|   a|123|
| 003|   c|456|
| 002|   b|dfr|
| 003|   c|ytr|
+----+----+---+

现在我使用如下窗口函数添加了行号:

val baseDF =  DS1.select(col("KEY1"), col("KEY2"), col("VAL") ,row_number().over(Window.partitionBy(col("KEY1"), col("KEY2")).orderBy(col("KEY1"), col("KEY2").asc)).alias("Row_Num"))
baseDF.show()

+----+----+---+-------+
|KEY1|KEY2|VAL|Row_Num|
+----+----+---+-------+
|001 |a   |abc|1      |
|001 |a   |123|2      |
|002 |b   |dfr|1      |
|003 |c   |456|1      |
|003 |c   |ytr|2      |
+----+----+---+-------+

现在增量负载如下:

val delta = List(List("001", "a", "y45") ,List("002", "b", "444"))
  .map(row => (row(0), row(1), row(2)))

val DS2 = delta.toDF("KEY1", "KEY2" ,"VAL")
DS2.show()

+----+----+---+
|KEY1|KEY2|VAL|
+----+----+---+
| 001|   a|y45|
| 002|   b|444|
+----+----+---+

所以预期的更新结果应该是:

baseDF.show()

|KEY1|KEY2|VAL|Row_Num|
+----+----+---+-------+
|001 |a   |abc|1      |
|001 |a   |123|2      |
| 001|   a|y45|3      | -----> Delta record
|002 |b   |dfr|1      |
| 002|   b|444|2      | -----> Delta record 
|003 |c   |456|1      |
|003 |c   |ytr|2      |
+----+----+---+-------+

对使用数据框/数据集实施此解决方案有何建议? 可以用spark rdd的zipWithIndex实现上述方案吗?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    使用更新的行号添加增量的一种方法是:1) 在DS2 中添加具有大数字的列Row_Num,2) 将baseDF 与它合并,3) 计算新的行号,如下图:

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window
    
    val combinedDF = baseDF.union(
      DS2.withColumn("Row_Num", lit(Long.MaxValue))
    )
    
    val resultDF = combinedDF.select(
      col("KEY1"), col("KEY2"), col("VAL"), row_number().over(
        Window.partitionBy(col("KEY1"), col("KEY2")).orderBy(col("Row_Num"))
      ).alias("New_Row_Num")
    )
    
    resultDF.show
    +----+----+---+-----------+
    |KEY1|KEY2|VAL|New_Row_Num|
    +----+----+---+-----------+
    | 003|   c|456|          1|
    | 003|   c|ytr|          2|
    | 002|   b|dfr|          1|
    | 002|   b|444|          2|
    | 001|   a|abc|          1|
    | 001|   a|123|          2|
    | 001|   a|y45|          3|
    +----+----+---+-----------+
    

    【讨论】:

    • val resultDF = combineDF.select( col("KEY1"), col("KEY2"), col("VAL"), row_number().over( Window.partitionBy(col("KEY1 "), col("KEY2")).orderBy(col("Row_Num")) ).alias("New_Row_Num") ) 。如何将 DESC 应用于窗口分区??
    • @Jack,如果我理解你的问题,你可以简单地指定orderBy(col("Row_Num").desc)
    猜你喜欢
    • 2021-12-16
    • 1970-01-01
    • 1970-01-01
    • 2016-04-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-10-24
    相关资源
    最近更新 更多