【发布时间】:2022-01-19 18:25:39
【问题描述】:
我需要修改一个复杂的数据框架构,根据列名的动态列表添加列。
如果架构中的一列包含在列表中,则该列需要在架构中的同一位置“复制”,名称中带有后缀“_duplicated”,并带有字符串类型。
解决方案需要是动态的,因为模式非常多变。
例如,采用以下代码:
data = [('{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "duplicateme": "please", "metoo": ["hello","world"]}}]}}') ]
columns_to_duplicate = ['hello','duplicateme', 'metoo']
df = spark.read.json(sc.parallelize(data))
df.printSchema()
架构如下:
root
|-- hello: string (nullable = true)
|-- thisisastruct: struct (nullable = true)
| |-- thisisanarray: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- thisisanotherstruct: struct (nullable = true)
| | | | |-- ID: long (nullable = true)
| | | | |-- duplicateme: string (nullable = true)
| | | | |-- metoo: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
我希望生成的架构如下:
root
|-- hello: string (nullable = true)
|-- hello_duplicated: string (nullable = true)
|-- thisisastruct: struct (nullable = true)
| |-- thisisanarray: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- thisisanotherstruct: struct (nullable = true)
| | | | |-- ID: long (nullable = true)
| | | | |-- duplicateme: string (nullable = true)
| | | | |-- duplicateme_duplicated: string (nullable = true)
| | | | |-- metoo: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- metoo_duplicated: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
编辑:这是我现在所拥有的,但数组不起作用:
def transform_schema(schema, columns_to_dupe):
if schema == None:
return StructType()
updated = []
for f in schema.fields:
if isinstance(f.dataType, ArrayType):
updated.append(StructField(f.name, f.dataType, f.nullable))
if f.name in columns_to_dupe:
# if ArrayType unpack the array type
if not isinstance(f.dataType.elementType, StructType) and not isinstance(f.dataType.elementType, ArrayType):
updated.append(StructField(f.name+'_duplicated', ArrayType(StringType()), f.nullable))
elif isinstance(f.dataType.elementType, StructType):
updated.append(StructField(f.name, ArrayType(transform_schema(f.dataType.elementType, columns_to_dupe))))
elif isinstance(f.dataType, StructType):
# if StructType do recursion
updated.append(StructField(f.name, transform_schema(f.dataType,columns_to_dupe)))
if f.name in columns_to_dupe:
updated.append(StructField(f.name, transform_schema(f.dataType,f.dataType.fieldNames())))
else:
# else handle all the other cases i.e TimestampType, StringType etc
updated.append(StructField(f.name, f.dataType, f.nullable))
if f.name in columns_to_dupe:
updated.append(StructField(f.name+'_duplicated', StringType(), f.nullable))
return StructType(updated)
提前致谢
【问题讨论】:
标签: apache-spark pyspark