【发布时间】:2022-01-19 23:09:28
【问题描述】:
我是 Spark 的新手,想在多个列上旋转 PySpark 数据框。每个不同的(日期、等级)组合都有一行。这些行应该被展平,以便每个唯一日期有一行。
import pyspark.sql.functions as F
from datetime import datetime
data= [(datetime(2021,8,4,13,0), 1, 22, "a"),(datetime(2021,8,4,13,0), 2, 14, "a"),(datetime(2021,8,4,13,0), 3, 9, "a"),(datetime(2021,8,4,13,0), 4, 7, "a"),
(datetime(2021,8,4,14,0), 1, 16, "b"),(datetime(2021,8,4,14,0), 2, 21, "b"),(datetime(2021,8,4,14,0), 3, 17, "b"),(datetime(2021,8,4,14,0), 4, 18, "b"),
(datetime(2021,8,4,15,0), 1, 19, "a"),(datetime(2021,8,4,15,0), 2, 9, "b"),(datetime(2021,8,4,15,0), 3, 10, "c"),(datetime(2021,8,4,15,0), 4, 13, "d")
]
columns= ["date","rank","feat1","feat2"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)
+-------------------+----+-----+-----+
|date |rank|feat1|feat2|
+-------------------+----+-----+-----+
|2021-08-04 13:00:00|1 |22 |a |
|2021-08-04 13:00:00|2 |14 |a |
|2021-08-04 13:00:00|3 |9 |a |
|2021-08-04 13:00:00|4 |7 |a |
|2021-08-04 14:00:00|1 |16 |b |
|2021-08-04 14:00:00|2 |21 |b |
|2021-08-04 14:00:00|3 |17 |b |
|2021-08-04 14:00:00|4 |18 |b |
|2021-08-04 15:00:00|1 |19 |a |
|2021-08-04 15:00:00|2 |9 |b |
|2021-08-04 15:00:00|3 |10 |c |
|2021-08-04 15:00:00|4 |13 |d |
+-------------------+----+-----+-----+
真实数据有 30+ 个特征列,每个日期的排名从 1 到 100。所需的输出:
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
| date|rank1_feat1|rank2_feat1|rank3_feat1|rank4_feat1|rank1_feat2|rank2_feat2|rank3_feat2|rank4_feat2|
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|2021-08-04 15:00:00| 19| 9| 10| 13| a| b| c| d|
|2021-08-04 13:00:00| 22| 14| 9| 7| a| a| a| a|
|2021-08-04 14:00:00| 16| 21| 17| 18| b| b| b| b|
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
我有一个似乎适用于我的简单示例的解决方案,但内存使用量非常大,以至于我什至无法使用 1/500 的数据而不会出现内存错误。
dfspine = df.select("date").distinct()
for col in df.columns:
if col not in ["date", "rank"]:
piv = df.groupby("date").pivot("rank").agg(F.first(col))
mapping = dict([(pivcol,"rank%s_%s" % (pivcol, col)) for pivcol in piv.columns if pivcol not in ["date"]])
piv = piv.select([F.col(c).alias(mapping.get(c, c)) for c in piv.columns])
dfspine = dfspine.join(piv, how="left", on="date")
【问题讨论】:
-
那你想创建3000列,对吧?为什么不直接使用 struct 列?
标签: python pyspark group-by pivot