您可以使用explode 选项在pyspark 中实现此目的
首先导入必要的库和函数
from pyspark.sql import SQLContext, Row
假设你的数据框是df。
如果你这样做df.show()
你应该得到如下结果
+---+----------+----------+----------+
| id| field1| field2| field3|
+---+----------+----------+----------+
| 0| 11_field1| 22_field2| 33_field3|
| 1|111_field1|222_field2|333_field3|
+---+----------+----------+----------+
然后将要分解的所有列映射为 2 列。在这里,您希望除 id 之外的所有列都爆炸。所以,请执行以下操作
cols= df.columns[1:]
然后将data frame 转换为rdd,如下所示
rdd = data.rdd.map(lambda x: Row(id=x[0], val=dict(zip(cols, x[1:]))))
要检查 rdd 的映射方式,请执行以下操作
rdd.take()
你会得到如下结果
[Row(id=0, val={'field2': u'22_field2', 'field3': u'33_field3', 'field1': u'11_field1'}), Row(id=1, val={'field2': u'222_field2', 'field3': u'333_field3', 'field1': u'111_field1'})]
然后将rdd 转换回data frame 说df2
df2 = sqlContext.createDataFrame(rdd)
然后执行df2.show()。你应该得到如下结果
+---+--------------------+
| id| val|
+---+--------------------+
| 0|Map(field3 -> 33_...|
| 1|Map(field3 -> 333...|
+---+--------------------+
然后将数据框df2注册为临时表
df2.registerTempTable('mytempTable')
然后在数据框上运行如下查询:
df3 = sqlContext.sql( """select id,explode(val) AS (fieldname,fieldvalue) from mytempTable""")
然后df3.show(),你应该得到如下结果
+---+---------+----------+
| id|fieldname|fieldvalue|
+---+---------+----------+
| 0| field3| 33_field3|
| 0| field2| 22_field2|
| 0| field1| 11_field1|
| 1| field3|333_field3|
| 1| field2|222_field2|
| 1| field1|111_field1|
+---+---------+----------+