【问题标题】:Fill in row missing values with previous and next non missing values用前一个和下一个非缺失值填充行缺失值
【发布时间】:2020-08-20 13:08:02
【问题描述】:
我知道您可以使用最后一个函数与窗口函数结合使用下一个非缺失值向前/向后填充缺失值。
但我有一个数据看起来像:
Area,Date,Population
A, 1/1/2000, 10000
A, 2/1/2000,
A, 3/1/2000,
A, 4/1/2000, 10030
A, 5/1/2000,
在这个例子中,对于 5 月的人口,我喜欢填写 10030,这很容易。但是对于 2 月和 3 月,我想填写的值是 10000 和 10030 的平均值,而不是 10000 或 10030。
你知道如何实现吗?
谢谢,
【问题讨论】:
标签:
pyspark
apache-spark-sql
pyspark-dataframes
【解决方案1】:
获取next 和previous 值并计算平均值如下-
df2.show(false)
df2.printSchema()
/**
* +----+--------+----------+
* |Area|Date |Population|
* +----+--------+----------+
* |A |1/1/2000|10000 |
* |A |2/1/2000|null |
* |A |3/1/2000|null |
* |A |4/1/2000|10030 |
* |A |5/1/2000|null |
* +----+--------+----------+
*
* root
* |-- Area: string (nullable = true)
* |-- Date: string (nullable = true)
* |-- Population: integer (nullable = true)
*/
val w1 = Window.partitionBy("Area").orderBy("Date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
val w2 = Window.partitionBy("Area").orderBy("Date").rowsBetween(Window.currentRow, Window.unboundedFollowing)
df2.withColumn("previous", last("Population", ignoreNulls = true).over(w1))
.withColumn("next", first("Population", ignoreNulls = true).over(w2))
.withColumn("new_Population", (coalesce($"previous", $"next") + coalesce($"next", $"previous")) / 2)
.drop("next", "previous")
.show(false)
/**
* +----+--------+----------+--------------+
* |Area|Date |Population|new_Population|
* +----+--------+----------+--------------+
* |A |1/1/2000|10000 |10000.0 |
* |A |2/1/2000|null |10015.0 |
* |A |3/1/2000|null |10015.0 |
* |A |4/1/2000|10030 |10030.0 |
* |A |5/1/2000|null |10030.0 |
* +----+--------+----------+--------------+
*/
【解决方案2】:
这是我的尝试。
w1和w2用于划分窗口,w3和w4用于填充前后值。之后,您可以给出条件来计算如何填充Population。
import pyspark.sql.functions as f
from pyspark.sql import Window
w1 = Window.partitionBy('Area').orderBy('Date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w2 = Window.partitionBy('Area').orderBy('Date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
w3 = Window.partitionBy('Area', 'partition1').orderBy('Date')
w4 = Window.partitionBy('Area', 'partition2').orderBy(f.desc('Date'))
df.withColumn('check', f.col('Population').isNotNull().cast('int')) \
.withColumn('partition1', f.sum('check').over(w1)) \
.withColumn('partition2', f.sum('check').over(w2)) \
.withColumn('first', f.first('Population').over(w3)) \
.withColumn('last', f.first('Population').over(w4)) \
.withColumn('fill', f.when(f.col('first').isNotNull() & f.col('last').isNotNull(), (f.col('first') + f.col('last')) / 2).otherwise(f.coalesce('first', 'last'))) \
.withColumn('Population', f.coalesce('Population', 'fill')) \
.orderBy('Date') \
.select(*df.columns).show(10, False)
+----+--------+----------+
|Area|Date |Population|
+----+--------+----------+
|A |1/1/2000|10000.0 |
|A |2/1/2000|10015.0 |
|A |3/1/2000|10015.0 |
|A |4/1/2000|10030.0 |
|A |5/1/2000|10030.0 |
+----+--------+----------+