【问题标题】:Custom aggregation to a JSON in pyspark自定义聚合到 pyspark 中的 JSON
【发布时间】:2021-06-18 23:41:10
【问题描述】:
这是我当前的表:
| User Id |
Product |
Amount 1 |
Amount 2 |
Amount 3 |
| 1 |
A |
100 |
200 |
300 |
| 1 |
B |
200 |
300 |
400 |
| 2 |
A |
500 |
600 |
700 |
这是我正在寻找的输出:
| User Id |
Amount 1 |
Amount 2 |
Amount 3 |
| 1 |
{"A": 100, "B": 200} |
{"A": 200, "B": 300} |
{"A": 300, "B": 400} |
| 2 |
{"A": 500} |
{"A": 600} |
{"A": 700} |
我知道我应该使用用户定义的聚合函数,但无法理解如何通过 PySpark 实现它们。
任何帮助将不胜感激。
【问题讨论】:
标签:
python
apache-spark
pyspark
apache-spark-sql
【解决方案1】:
您可以在按User Id分组并创建地图列后使用to_json:
from pyspark.sql import functions as F
df1 = df.groupBy("User Id").agg(*[
F.to_json(
F.map_from_entries(F.collect_list(F.struct(F.col("Product"), F.col(c))))
).alias(c)
for c in df.columns[2:]
])
df1.show()
#+-------+-----------------+-----------------+-----------------+
#|User Id| Amount 1| Amount 2| Amount 3|
#+-------+-----------------+-----------------+-----------------+
#| 1|{"A":100,"B":200}|{"A":200,"B":300}|{"A":300,"B":400}|
#| 2| {"A":500}| {"A":600}| {"A":700}|
#+-------+-----------------+-----------------+-----------------+