【发布时间】:2018-01-11 11:08:08
【问题描述】:
我需要使用窗口函数 Lag 和 Lead 对数据帧执行以下操作。
对于每个 Key,我需要在最终输出中执行下面的插入和更新
插入条件:
1. 默认情况下,LAYER_NO=0,需要写入输出。
2.如果COL1,COL2,COL3的值有任何变化,相对于其宝贵的记录,那么该记录需要写入输出。
示例:key_1 与 layer_no=2,COL3 中值从 400 变为 600
更新条件:
1. 如果 COL1,COL2,COL3 的值相对于其之前的记录没有变化,但“DEPART 列”有变化,则需要在输出中更新该值。
示例:key_1 with layer_no=3,COL1,COL2,COL3 没有变化,但是 DEPART 列中的值变化为“xyz”,因此需要在输出中更新。
2.连LAYER_NO也要依次更新,插入layer_no=0的记录后
val inputDF = values.toDF("KEY","LAYER_NO","COl1","COl2","COl3","DEPART")
inputDF.show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COL1|COL2|COL3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|->default write
|key_1| 1| 200| 300| 400| abc|
|key_1| 2| 200| 300| 600| uil|--->change in col3,so write
|key_1| 2| 200| 300| 600| uil|
|key_1| 3| 200| 300| 600| xyz|--->change in col4,so update
|key_2| 0| 500| 700| 900| prq|->default write
|key_2| 1| 888| 555| 900| tep|--->change in col1 & col 2,so write
|key_3| 0| 111| 222| 333| lgh|->default write
|key_3| 1| 084| 222| 333| lgh|--->change in col1,so write
|key_3| 2| 084| 222| 333| rrr|--->change in col4,so update
+-----+--------+----+----+----+------+
预期输出:
outputDF.show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|
|key_1| 1| 200| 300| 600| xyz|
|key_2| 0| 500| 700| 900| prq|
|key_2| 1| 888| 555| 900| tep|
|key_3| 0| 111| 222| 333| lgh|
|key_3| 1| 084| 222| 333| rrr|
+-----+--------+----+----+----+------+
【问题讨论】:
-
为什么
|key_1| 2| 200| 300| 600| uil没有出现在输出中? -
|key_1| 1| 200| 300| 600|用户|写入输出,但在下一条记录中,DEPART 列中的值发生了变化,因此这会将“uil”更新为“xyz”。所以最终记录的将是|key_1| 1| 200| 300| 600| xyz|
-
那么为什么 layer_no 是 1 ?不应该是3吗?
-
对不起,它错过了信息....现在更新了问题..在插入 layer_no=0 的记录后,即使 LAYER_NO 也应该按顺序更新
标签: scala apache-spark pyspark apache-spark-sql spark-dataframe