【问题标题】:transform distinct row values to different columns with corresponding rows using Pyspark使用 Pyspark 将不同的行值转换为具有相应行的不同列
【发布时间】:2020-11-16 09:03:46
【问题描述】:

我是 Pyspark 的新手,正在尝试转换数据

给定数据框

Col1
A=id1a A=id2a B=id1b C=id1c B=id2b
D=id1d A=id3a B=id3b C=id2c
A=id4a C=id3c

必填:

A         B        C
id1a     id1b     id1c
id2a     id2b     id2c
id3a     id3b     id3b
id4a     null     null

我尝试过 pivot,但它给出了第一个值。

【问题讨论】:

  • 你能给出一个可重现的数据样本吗?

标签: pyspark pivot rdd transpose flatmap


【解决方案1】:

可能有更好的方法,但是一种方法是将列拆分为空格以创建条目数组,然后使用高阶函数(spark 2.4+)在 '=' 上拆分拆分数组中的每个条目.然后分解并创建2列,一列带有id,一列带有值。然后我们可以为每个分区和groupby分配一个行号然后pivot:

import pyspark.sql.functions as F
df1 = (df.withColumn("Col1",F.split(F.col("Col1"),"\s+")).withColumn("Col1",
             F.explode(F.expr("transform(Col1,x->split(x,'='))")))
        .select(F.col("Col1")[0].alias("cols"),F.col("Col1")[1].alias("vals")))

from pyspark.sql import Window
w = Window.partitionBy("cols").orderBy("cols")
final = (df1.withColumn("Rnum",F.row_number().over(w)).groupBy("Rnum")
         .pivot("cols").agg(F.first("vals")).orderBy("Rnum"))

final.show()

+----+----+----+----+----+
|Rnum|   A|   B|   C|   D|
+----+----+----+----+----+
|   1|id1a|id1b|id1c|id1d|
|   2|id2a|id2b|id2c|null|
|   3|id3a|id3b|id3c|null|
|   4|id4a|null|null|null|
+----+----+----+----+----+

这是df1 转换后的样子:

df1.show()
+----+----+
|cols|vals|
+----+----+
|   A|id1a|
|   A|id2a|
|   B|id1b|
|   C|id1c|
|   B|id2b|
|   D|id1d|
|   A|id3a|
|   B|id3b|
|   C|id2c|
|   A|id4a|
|   C|id3c|
+----+----+

【讨论】:

    【解决方案2】:

    可能我不知道全貌,但数据格式似乎很奇怪。如果在数据源上什么都做不了,那么将需要一些收集、透视和连接。试试这个。

    import pyspark.sql.functions as F
    test = sqlContext.createDataFrame([('A=id1a A=id2a B=id1b C=id1c B=id2b',1),('D=id1d A=id3a B=id3b C=id2c',2),('A=id4a C=id3c',3)],schema=['col1','id'])
    tst_spl = test.withColumn("item",(F.split('col1'," ")))
    tst_xpl = tst_spl.select(F.explode("item"))
    tst_map = tst_xpl.withColumn("key",F.split('col','=')[0]).withColumn("value",F.split('col','=')[1]).drop('col')
    #%%
    tst_pivot = tst_map.groupby(F.lit(1)).pivot('key').agg(F.collect_list(('value'))).drop('1')
    #%%
    tst_arr = [tst_pivot.select(F.posexplode(coln)).withColumnRenamed('col',coln) for coln in tst_pivot.columns]  
    
    tst_fin = reduce(lambda df1,df2:df1.join(df2,on='pos',how='full'),tst_arr).orderBy('pos')
    
    tst_fin.show()
    +---+----+----+----+----+
    |pos|   A|   B|   C|   D|
    +---+----+----+----+----+
    |  0|id3a|id3b|id1c|id1d|
    |  1|id4a|id1b|id2c|null|
    |  2|id1a|id2b|id3c|null|
    |  3|id2a|null|null|null|
    +---+----+----+----+----
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-12-08
      • 2019-11-18
      • 2021-08-05
      • 1970-01-01
      • 2011-09-18
      • 1970-01-01
      相关资源
      最近更新 更多