from pyspark.sql.functions import *
df = spark.createDataFrame(
[
(1, 28, 24, 24, 32, 26, 54, 60, 36),
(2, 19, 12, 24, 13, 10, 24, 29, 10)
],
["STORE", "COL_APPLE_BB", "COL_APPLE_NONBB", "COL_PEAR_BB", "COL_PEAR_NONBB", "COL_ORANGE_BB", "COL_ORANGE_NONBB", "COL_GRAPE_BB","COL_GRAPE_NONBB"]
)
df2 = spark.createDataFrame(
[
(1, 1, "APPLE", "BB"),
(1, 2, "ORANGE", "NONBB"),
(1, 3, "PEAR", "BB"),
(1, 4, "GRAPE", "BB"),
(1, 5, "APPLE", "BB"),
(1, 6, "ORANGE", "BB"),
(2, 1, "PEAR", "NONBB"),
(2, 2, "ORANGE", "NONBB"),
(2, 3, "APPLE", "NONBB")
],
["STORE", "PDT", "FRUIT", "TYPE"]
)
unPivot_df = df.select("STORE",expr("stack(8, 'APPLE_BB',COL_APPLE_BB,
'APPLE_NONBB',COL_APPLE_NONBB,
'PEAR_BB', COL_PEAR_BB,
'PEAR_NONBB', COL_PEAR_NONBB,
'ORANGE_BB',COL_ORANGE_BB,
'ORANGE_NONBB',COL_ORANGE_NONBB,
'GRAPE_BB',COL_GRAPE_BB,
'GRAPE_NONBB',COL_GRAPE_NONBB) as (Appended,COL_VALUE)"))
df2 = df2.withColumn("Appended",concat_ws('_',col("FRUIT"),col("TYPE")))
df2 = df2.join(unPivot_df,['STORE',"Appended"],"left")
df2.show()
+-----+------------+---+------+-----+---------+
|STORE| Appended|PDT| FRUIT| TYPE|COL_VALUE|
+-----+------------+---+------+-----+---------+
| 1|ORANGE_NONBB| 2|ORANGE|NONBB| 54|
| 1| PEAR_BB| 3| PEAR| BB| 24|
| 1| GRAPE_BB| 4| GRAPE| BB| 60|
| 1| APPLE_BB| 1| APPLE| BB| 28|
| 2|ORANGE_NONBB| 2|ORANGE|NONBB| 24|
| 2| APPLE_NONBB| 3| APPLE|NONBB| 12|
| 1| ORANGE_BB| 6|ORANGE| BB| 26|
| 1| APPLE_BB| 5| APPLE| BB| 28|
| 2| PEAR_NONBB| 1| PEAR|NONBB| 13|
+-----+------------+---+------+-----+---------+