【发布时间】:2020-12-07 12:25:56
【问题描述】:
Spark 3.0 版。
我有两个数据框。
我使用 pandas 日期范围创建了一个包含日期列的数据框。
我有一个包含公司名称、日期和值的第二个 spark 数据框。
我想将 DF2 合并到 DF1 按公司分组,这样我就可以填补缺失的日期,也可以填补上一行的缺失值。
我该怎么做?我想过left join,但似乎效果不佳。
【问题讨论】:
-
没有图片!!!将您的数据包含为字符串。
Spark 3.0 版。
我有两个数据框。
我使用 pandas 日期范围创建了一个包含日期列的数据框。
我有一个包含公司名称、日期和值的第二个 spark 数据框。
我想将 DF2 合并到 DF1 按公司分组,这样我就可以填补缺失的日期,也可以填补上一行的缺失值。
我该怎么做?我想过left join,但似乎效果不佳。
【问题讨论】:
试试这个。有点复杂。
import pyspark.sql.functions as f
from pyspark.sql import Window
df1 = spark.read.option("header","true").option("inferSchema","true").csv("test1.csv") \
.withColumn('Date', f.to_date('Date', 'dd/MM/yyyy'))
df2 = spark.read.option("header","true").option("inferSchema","true").csv("test2.csv") \
.withColumn('Date', f.to_date('Date', 'dd/MM/yyyy'))
w1 = Window.orderBy('Company', 'Date')
w2 = Window.orderBy('Company', 'Date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w3 = Window.partitionBy('partition').orderBy('Company', 'Date')
df1.crossJoin(df2.select('Company').distinct()) \
.join(df2, ['Company', 'Date'], 'left') \
.withColumn('range', (f.col('Value').isNull() | f.lead(f.col('Value'), 1, 0).over(w1).isNull()) != f.col('Value').isNull()) \
.withColumn('partition', f.sum(f.col('range').cast('int')).over(w2)) \
.withColumn('fill', f.first('Value').over(w3)) \
.orderBy('Company', 'Date') \
.selectExpr('Company', 'Date', 'coalesce(Value, fill) as Value') \
.show(20, False)
+-------+----------+-----+
|Company|Date |Value|
+-------+----------+-----+
|A |2000-01-01|13 |
|A |2000-01-02|14 |
|A |2000-01-03|15 |
|A |2000-01-04|19 |
|A |2000-01-05|19 |
|A |2000-01-06|19 |
|A |2000-01-07|19 |
|A |2000-01-08|19 |
|A |2000-01-09|19 |
|B |2000-01-01|19 |
|B |2000-01-02|19 |
|B |2000-01-03|20 |
|B |2000-01-04|25 |
|B |2000-01-05|23 |
|B |2000-01-06|24 |
|B |2000-01-07|24 |
|B |2000-01-08|24 |
|B |2000-01-09|24 |
+-------+----------+-----+
您可以通过多次添加.show 来查看每一行发生的情况,这可能会有所帮助。
【讨论】: