【问题标题】:How do I create pivot this way in Pyspark?如何在 Pyspark 中以这种方式创建枢轴?
【发布时间】:2023-02-03 22:56:03
【问题描述】:

我有一个 pyspark 数据框 df :-

STORE COL_APPLE_BB COL_APPLE_NONBB COL_PEAR_BB COL_PEAR_NONBB COL_ORANGE_BB COL_ORANGE_NONBB COL_GRAPE_BB COL_GRAPE_NONBB
1 28 24 24 32 26 54 60 36
2 19 12 24 13 10 24 29 10

我有另一个 pyspark df df2 :-

STORE PDT FRUIT TYPE
1 1 APPLE BB
1 2 ORANGE NONBB
1 3 PEAR BB
1 4 GRAPE BB
1 5 APPLE BB
1 6 ORANGE BB
2 1 PEAR NONBB
2 2 ORANGE NONBB
2 3 APPLE NONBB

预期 pyspark df2 带有一个列 COL_VALUE 代表商店、水果、类型:-

STORE PDT FRUIT TYPE COL_VALUE
1 1 APPLE BB 28
1 2 ORANGE NONBB 54
1 3 PEAR BB 24
1 4 GRAPE BB 60
1 5 APPLE BB 28
1 6 ORANGE BB 26
2 1 PEAR NONBB 13
2 2 ORANGE NONBB 24
2 3 APPLE NONBB 12

【问题讨论】:

    标签: pyspark


    【解决方案1】:
    
    from pyspark.sql.functions import *
    
    df = spark.createDataFrame(
        [
            (1, 28, 24, 24, 32, 26, 54, 60, 36),
    (2, 19, 12, 24, 13, 10, 24, 29, 10)
        ],
        ["STORE",   "COL_APPLE_BB", "COL_APPLE_NONBB",  "COL_PEAR_BB",  "COL_PEAR_NONBB",   "COL_ORANGE_BB",    "COL_ORANGE_NONBB", "COL_GRAPE_BB","COL_GRAPE_NONBB"]
    )
    
    
    df2 = spark.createDataFrame(
        [
            (1, 1,  "APPLE",    "BB"),
            (1, 2,  "ORANGE",   "NONBB"),
            (1, 3,  "PEAR", "BB"),
            (1, 4, "GRAPE", "BB"),
            (1, 5,  "APPLE",    "BB"),
            (1, 6,  "ORANGE",   "BB"),
            (2, 1,  "PEAR", "NONBB"),
            (2, 2, "ORANGE",    "NONBB"),
            (2, 3,  "APPLE",    "NONBB")
        ],
        ["STORE", "PDT", "FRUIT", "TYPE"]
    )
    
    unPivot_df = df.select("STORE",expr("stack(8, 'APPLE_BB',COL_APPLE_BB,
                                                             'APPLE_NONBB',COL_APPLE_NONBB,
                                                             'PEAR_BB', COL_PEAR_BB,
                                                             'PEAR_NONBB', COL_PEAR_NONBB,
                                                             'ORANGE_BB',COL_ORANGE_BB, 
                                                             'ORANGE_NONBB',COL_ORANGE_NONBB,
                                                             'GRAPE_BB',COL_GRAPE_BB,
                                                             'GRAPE_NONBB',COL_GRAPE_NONBB) as (Appended,COL_VALUE)"))  
        
    df2 = df2.withColumn("Appended",concat_ws('_',col("FRUIT"),col("TYPE")))    
    df2 = df2.join(unPivot_df,['STORE',"Appended"],"left")
    df2.show()
    
    +-----+------------+---+------+-----+---------+
    |STORE|    Appended|PDT| FRUIT| TYPE|COL_VALUE|
    +-----+------------+---+------+-----+---------+
    |    1|ORANGE_NONBB|  2|ORANGE|NONBB|       54|
    |    1|     PEAR_BB|  3|  PEAR|   BB|       24|
    |    1|    GRAPE_BB|  4| GRAPE|   BB|       60|
    |    1|    APPLE_BB|  1| APPLE|   BB|       28|
    |    2|ORANGE_NONBB|  2|ORANGE|NONBB|       24|
    |    2| APPLE_NONBB|  3| APPLE|NONBB|       12|
    |    1|   ORANGE_BB|  6|ORANGE|   BB|       26|
    |    1|    APPLE_BB|  5| APPLE|   BB|       28|
    |    2|  PEAR_NONBB|  1|  PEAR|NONBB|       13|
    +-----+------------+---+------+-----+---------+
    
    

    【讨论】:

      【解决方案2】:

      除了melt,您还可以在早期的 Spark 版本中使用stack

      df = spark.createDataFrame(
          [
              (1, 28, 24),
              (2, 19, 12),
          ],
          ["STORE", "COL_APPLE_BB", "COL_APPLE_NONBB"]
      )
      
      df2 = spark.createDataFrame(
          [
              (1, 1, "APPLE", "BB"),
              (1, 2, "ORANGE", "NONBB"),
              (1, 2, "APPLE", "NONBB"),
              (2, 3, "APPLE", "NONBB")
          ],
          ["STORE", "PDT", "FRUIT", "TYPE"]
      )
      

      创建一个与df 中的“COL_FRUIT_TYPE”相匹配的列:

      df3 = df2.withColumn("fruit_type", F.concat(F.lit("COL_"), F.col("FRUIT"), F.lit("_"), F.col("TYPE")))
      df3.show(10, False)
      

      给出:

      +-----+---+------+-----+----------------+
      |STORE|PDT|FRUIT |TYPE |fruit_type      |
      +-----+---+------+-----+----------------+
      |1    |1  |APPLE |BB   |COL_APPLE_BB    |
      |1    |2  |ORANGE|NONBB|COL_ORANGE_NONBB|
      |1    |2  |APPLE |NONBB|COL_APPLE_NONBB |
      +-----+---+------+-----+----------------+
      

      然后“反转”第一个df

      from pyspark.sql.functions import expr
      
      unpivotExpr = "stack(2, 'COL_APPLE_BB', COL_APPLE_BB, 'COL_APPLE_NONBB', COL_APPLE_NONBB) as (fruit_type, COL_VALUE)"
      unPivotDF = df.select("STORE", expr(unpivotExpr)) 
                    .where("STORE is not null")
      
      unPivotDF.show(truncate=False)
      

      给出:

      +-----+---------------+---------+
      |STORE|fruit_type     |COL_VALUE|
      +-----+---------------+---------+
      |1    |COL_APPLE_BB   |28       |
      |1    |COL_APPLE_NONBB|24       |
      |2    |COL_APPLE_BB   |19       |
      |2    |COL_APPLE_NONBB|12       |
      +-----+---------------+---------+
      

      并加入两者:

      df3.join(unPivotDF, ["fruit_type", "STORE"], "left")
         .select("STORE", "PDT", "FRUIT", "TYPE", "COL_VALUE").show(40, False)
      

      结果:

      +-----+---+------+-----+---------+
      |STORE|PDT|FRUIT |TYPE |COL_VALUE|
      +-----+---+------+-----+---------+
      |1    |2  |ORANGE|NONBB|null     |
      |1    |2  |APPLE |NONBB|24       |
      |1    |1  |APPLE |BB   |28       |
      |2    |3  |APPLE |NONBB|12       |
      +-----+---+------+-----+---------+
      

      缺点是您需要枚举stack 中的列名,如果我想出一种自动执行此操作的方法,我会更新答案。

      【讨论】:

        【解决方案3】:

        如果你有 Spark 3.2 或更高版本,你可以使用类似的东西:

        data = data.melt(
            id_vars=['STORE'],
            value_vars=data.columns[1:], 
            var_name="variable", 
            value_name="value"
        )
        

        获取数据集的“长”形式,然后使用 regex_extract 两次从 variable 列获取所需信息。

        对于早期版本的 Spark,请使用以下命令:

        def process_row(row):
            output = []
            for index, key in enumerate(row.asDict()):
                if key == "STORE":
                    store = row[key]
                else:
                    _, fruit, type = key.split("_")
                    output.append((store, index, fruit, type, row[key]))
            return output
        
        
        data = data.rdd.flatMap(process_row).toDF(
            schema=["STORE", "PDT", "FRUIT", "TYPE", "COLUMN_VALUE"]
        )
        

        【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2011-10-16
        • 1970-01-01
        • 2021-08-26
        • 2016-11-07
        • 2017-01-08
        • 1970-01-01
        • 2014-09-17
        相关资源
        最近更新 更多