我已经想办法做我想做的事了。这个想法是为嵌套列(结构)创建架构,如下所示:
from pyspark.sql.functions import lit, udf
from pyspark.sql.types import StringType, StructField, StructType
schema = StructType([
StructField('level2a',
StructType(
[
StructField('fielda', StringType(), nullable=False),
StructField('fieldb', StringType(), nullable=False),
StructField('fieldc', StringType(), nullable=False),
StructField('fieldd', StringType(), nullable=False),
StructField('fielde', StringType(), nullable=False),
StructField('fieldf', StringType(), nullable=False)
])
),
StructField('level2b',
StructType(
[
StructField('fielda', StringType(), nullable=False),
StructField('fieldb', StringType(), nullable=False),
StructField('fieldc', StringType(), nullable=False)
])
)
])
然后可以将其与 udf(将上述模式作为参数)结合使用以获得所需的结果。
def make_meta(fielda, fieldb, fieldc, fieldd, fielde, fieldf, fieldalvl2, fieldblvl2, fieldclvl2):
return [
[fielda, fieldb, fieldc, fieldd, fielde, fieldf],
[fieldalvl2, fieldblvl2, fieldclvl2]
]
test_udf = udf(lambda fielda,
fieldb,
fieldc,
fieldd,
fieldf,
fielde,
fieldalvl2, fieldblvl2, fieldclvl2:
make_meta(fielda,
fieldb,
fieldc,
fieldd,
fieldf,
fielde, fieldalvl2, fieldblvl2, fieldclvl2),
schema)
df = spark.range(0, 5)
df.withColumn("test", test_udf(lit("a"), lit("b"), lit("c"),lit("d"),lit("e"),lit("f"),lit("a"),lit("b"),lit("c"))).printSchema()
打印以下内容:
root
|-- id: long (nullable = false)
|-- test: struct (nullable = true)
| |-- level2a: struct (nullable = true)
| | |-- fielda: string (nullable = false)
| | |-- fieldb: string (nullable = false)
| | |-- fieldc: string (nullable = false)
| | |-- fieldd: string (nullable = false)
| | |-- fielde: string (nullable = false)
| | |-- fieldf: string (nullable = false)
| |-- level2b: struct (nullable = true)
| | |-- fielda: string (nullable = false)
| | |-- fieldb: string (nullable = false)
| | |-- fieldc: string (nullable = false)
在 scala 中,可以从 udf 返回案例类的实例,这是我在 python 中尝试做的(即返回一个对象)