这是一种方法没有 udf。
初始化示例数据框:
nested_df1 = (spark.read.json(sc.parallelize(["""[
{ "state": {"fld": 1} },
{ "state": {"fld": 2}}
]"""])))
nested_df1.printSchema()
root
|-- state: struct (nullable = true)
| |-- fld: long (nullable = true)
默认情况下,Spark .read.json 将所有整数导入为 long。
如果state.fld 必须是int,则需要强制转换它。
from pyspark.sql import functions as F
nested_df1 = (nested_df1
.select( F.struct(F.col("state.fld").alias("fld").cast('int')).alias("state") ))
nested_df1.printSchema()
root
|-- state: struct (nullable = false)
| |-- col1: integer (nullable = true)
nested_df1.show()
+-----+
|state|
+-----+
| [1]|
| [2]|
+-----+
终于
使用.select 使用"parent.child" 表示法从现有结构中获取您想要的嵌套列,创建新列,然后将旧列与新列一起重新包装在struct 中。
val_a = 3
nested_df2 = (nested_df
.select(
F.struct(
F.col("state.fld"),
F.lit(val_a).alias("a")
).alias("state")
)
)
nested_df2.printSchema()
root
|-- state: struct (nullable = false)
| |-- fld: integer (nullable = true)
| |-- a: integer (nullable = false)
nested_df2.show()
+------+
| state|
+------+
|[1, 3]|
|[2, 3]|
+------+
如果需要,使用"parent.*" 进行展平。
nested_df2.select("state.*").printSchema()
root
|-- fld: integer (nullable = true)
|-- a: integer (nullable = false)
nested_df2.select("state.*").show()
+---+---+
|fld| a|
+---+---+
| 1| 3|
| 2| 3|
+---+---+