【发布时间】:2020-12-04 04:02:41
【问题描述】:
我发现了各种类似的问题,但没有一个能回答我的具体问题。
我需要使用基于日期列的最新行值填充 pyspark 数据框中缺失的日期行。
我目前的解决方案是计算到今天为止的缺失日期列表,加入原始 df 并用最新的有效值一一填充所有列:
# get the maximum date from the df
max_date = df.select(F.max('date')).first()['max(date)')
today = datetime.date.today()
delta = today - max_date
dates_list = [(today - datetime.timedelta(days=x),) for x in range(delta.days)]
# if there are missing rows
if dates_list:
# create df with one column 'date'
dates_df = spark.createDataFrame(dates_list, schema=date_schema)
# join with original df
df = df.join(F.broadcast(dates_df), ['date'], 'outer')
w = Window.orderBy('date').rangeBetween(Window.unboundedPreceding, 0)
# fill all columns with latest non null col value
for c in df.columns:
if c != 'date':
df = df.withColumn(c, F.last(c, ignorenulls=True).over(w))
此代码的问题在于原始 df 包含大量列,并且 spark 为每个列计算一个窗口以获取最后一个非空值,这种方法效率非常低,导致逻辑计划庞大。
我想以一种简单的方式实现它,以简单地获取具有最大日期的行内容(因为它不包含空值)并使用计算日期列表更改日期直到今天。
对如何实施这种方法有什么建议吗?
示例输入:
date | col_one | col_two | col_three | .. | col_n
-------------------------------------------------------
2020-08-15 | 0.1 | 6.5 | 9.8 | .. | 0.7
2020-08-14 | 0.2 | 5.5 | 1.8 | .. | 3.7
2020-08-13 | 0.4 | 7.5 | 1.3 | .. | 0.5
2020-08-12 | 3.1 | 8.5 | 9.8 | .. | 1.7
2020-08-11 | 0.15 | 6.9 | 9.7 | .. | 0.2
示例输出:
date | col_one | col_two | col_three | .. | col_n
-------------------------------------------------------
2020-08-18 | 0.1 | 6.5 | 9.8 | .. | 0.7
2020-08-17 | 0.1 | 6.5 | 9.8 | .. | 0.7
2020-08-16 | 0.1 | 6.5 | 9.8 | .. | 0.7
2020-08-15 | 0.1 | 6.5 | 9.8 | .. | 0.7
2020-08-14 | 0.2 | 5.5 | 1.8 | .. | 3.7
2020-08-13 | 0.4 | 7.5 | 1.3 | .. | 0.5
2020-08-12 | 3.1 | 8.5 | 9.8 | .. | 1.7
2020-08-11 | 0.15 | 6.9 | 9.7 | .. | 0.2
【问题讨论】:
-
能否请您发布一些示例 i/p 和 o/p?
-
原始 df 只是一个带有日期列的数据框,就像 100 个浮点列一样,示例输出只包含截至今天的所有日期以及最新的行值。
-
请在您的问题中添加示例输入和输出,这很有帮助,您的评论也没有帮助。
-
我举了两个例子,希望现在更清楚
标签: python dataframe apache-spark pyspark