【问题标题】:Custom aggregation to a JSON in pyspark自定义聚合到 pyspark 中的 JSON
【发布时间】:2021-06-18 23:41:10
【问题描述】:

这是我当前的表:

User Id Product Amount 1 Amount 2 Amount 3
1 A 100 200 300
1 B 200 300 400
2 A 500 600 700

这是我正在寻找的输出:

User Id Amount 1 Amount 2 Amount 3
1 {"A": 100, "B": 200} {"A": 200, "B": 300} {"A": 300, "B": 400}
2 {"A": 500} {"A": 600} {"A": 700}

我知道我应该使用用户定义的聚合函数,但无法理解如何通过 PySpark 实现它们。

任何帮助将不胜感激。

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql


    【解决方案1】:

    您可以在按User Id分组并创建地图列后使用to_json

    from pyspark.sql import functions as F
    
    df1 = df.groupBy("User Id").agg(*[
        F.to_json(
            F.map_from_entries(F.collect_list(F.struct(F.col("Product"), F.col(c))))
        ).alias(c)
        for c in df.columns[2:]
    ])
    
    df1.show()
    #+-------+-----------------+-----------------+-----------------+
    #|User Id|         Amount 1|         Amount 2|         Amount 3|
    #+-------+-----------------+-----------------+-----------------+
    #|      1|{"A":100,"B":200}|{"A":200,"B":300}|{"A":300,"B":400}|
    #|      2|        {"A":500}|        {"A":600}|        {"A":700}|
    #+-------+-----------------+-----------------+-----------------+
    

    【讨论】:

      猜你喜欢
      • 2016-06-29
      • 2017-04-22
      • 1970-01-01
      • 2021-06-06
      • 2016-11-03
      • 2018-10-19
      • 2014-08-08
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多