【发布时间】:2022-01-23 14:24:32
【问题描述】:
让我用一个例子来解释我的问题: 我有一个数据框:
pd_1 = pd.DataFrame({'day':[1,2,3,2,1,3],
'code': [10, 10, 20,20,30,30],
'A': [44, 55, 66,77,88,99],
'B':['a',None,'c',None,'d', None],
'C':[None,None,'12',None,None, None]
})
df_1 = sc.createDataFrame(pd_1)
df_1.show()
输出:
+---+----+---+----+----+
|day|code| A| B| C|
+---+----+---+----+----+
| 1| 10| 44| a|null|
| 2| 10| 55|null|null|
| 3| 20| 66| c| 12|
| 2| 20| 77|null|null|
| 1| 30| 88| d|null|
| 3| 30| 99|null|null|
+---+----+---+----+----+
我想要实现的是一个新的数据框,每一行对应一个code,并且对于每一列,我想要拥有最新的非空值(最高的day)。
在 pandas 中,我可以简单地做
pd_2 = pd_1.sort_values('day', ascending=True).groupby('code').last()
pd_2.reset_index()
得到
code day A B C
0 10 2 55 a None
1 20 3 66 c 12
2 30 3 99 d None
我的问题是,如何在 pyspark(最好是版本
到目前为止我尝试过的是:
from pyspark.sql import Window
import pyspark.sql.functions as F
w = Window.partitionBy('code').orderBy(F.desc('day')).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
## Update: after applying @Steven's idea to remove for loop:
df_1 = df_1 .select([F.collect_list(x).over(w).getItem(0).alias(x) for x in df_.columns])
##for x in df_1.columns:
## df_1 = df_1.withColumn(x, F.collect_list(x).over(w).getItem(0))
df_1 = df_1.distinct()
df_1.show()
输出
+---+----+---+---+----+
|day|code| A| B| C|
+---+----+---+---+----+
| 2| 10| 55| a|null|
| 3| 30| 99| d|null|
| 3| 20| 66| c| 12|
+---+----+---+---+----+
我不太满意,尤其是因为for loop。
【问题讨论】:
-
@Steven 谢谢。但我认为它会有同样的循环遍历所有列的问题:|
-
如果您的问题只是 for 循环,请将其更改为 select 中的列表理解
-
改进代码是个好主意:)
-
问题是最后一行将包含来自不同行的元素(使用相同的代码),而不仅仅是选择一行。我认为它不能使用单个
row_number函数来完成。但如果我错过了什么,请告诉我或写一个答案:)
标签: dataframe apache-spark pyspark apache-spark-sql