【问题标题】:Pyspark - Dynamically adding fields to schemaPyspark - 将字段动态添加到模式
【发布时间】:2022-01-19 18:25:39
【问题描述】:

我需要修改一个复杂的数据框架构,根据列名的动态列表添加列。

如果架构中的一列包含在列表中,则该列需要在架构中的同一位置“复制”,名称中带有后缀“_duplicated”,并带有字符串类型。

解决方案需要是动态的,因为模式非常多变。

例如,采用以下代码:

data = [('{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "duplicateme": "please", "metoo": ["hello","world"]}}]}}') ]

columns_to_duplicate = ['hello','duplicateme', 'metoo']

df = spark.read.json(sc.parallelize(data))

df.printSchema()

架构如下:

root
 |-- hello: string (nullable = true)
 |-- thisisastruct: struct (nullable = true)
 |    |-- thisisanarray: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- thisisanotherstruct: struct (nullable = true)
 |    |    |    |    |-- ID: long (nullable = true)
 |    |    |    |    |-- duplicateme: string (nullable = true)
 |    |    |    |    |-- metoo: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)

我希望生成的架构如下:

root
 |-- hello: string (nullable = true)
 |-- hello_duplicated: string (nullable = true)
 |-- thisisastruct: struct (nullable = true)
 |    |-- thisisanarray: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- thisisanotherstruct: struct (nullable = true)
 |    |    |    |    |-- ID: long (nullable = true)
 |    |    |    |    |-- duplicateme: string (nullable = true)
 |    |    |    |    |-- duplicateme_duplicated: string (nullable = true)
 |    |    |    |    |-- metoo: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- metoo_duplicated: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)

编辑:这是我现在所拥有的,但数组不起作用:

def transform_schema(schema, columns_to_dupe):
    if schema == None:
        return StructType()
    
    updated = []

    for f in schema.fields:
        if isinstance(f.dataType, ArrayType):
            updated.append(StructField(f.name, f.dataType, f.nullable))
            if f.name in columns_to_dupe:
                # if ArrayType unpack the array type
                if not isinstance(f.dataType.elementType, StructType) and not isinstance(f.dataType.elementType, ArrayType):
                    updated.append(StructField(f.name+'_duplicated', ArrayType(StringType()), f.nullable))
                elif isinstance(f.dataType.elementType, StructType):
                    updated.append(StructField(f.name, ArrayType(transform_schema(f.dataType.elementType, columns_to_dupe))))
        elif isinstance(f.dataType, StructType):
            # if StructType do recursion
            updated.append(StructField(f.name, transform_schema(f.dataType,columns_to_dupe)))
            if f.name in columns_to_dupe:
                updated.append(StructField(f.name, transform_schema(f.dataType,f.dataType.fieldNames())))
        else:
            # else handle all the other cases i.e TimestampType, StringType etc
            updated.append(StructField(f.name, f.dataType, f.nullable))
            if f.name in columns_to_dupe:
                updated.append(StructField(f.name+'_duplicated', StringType(), f.nullable))
                
    return StructType(updated)

提前致谢

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    最后我能够使用 JSON 模式来做到这一点:

    schema_json = df.schema.json()
    
    schema_dict = json.loads(schema_json)
    
    def walk_schema(schema, columns_to_duplicate):
        for f in schema['fields']:
            try:
                if type(f['type']) != dict:
                    if f['name'] in columns_to_duplicate:
                        schema['fields'].append({"metadata":{},"name":f['name']+'_duplicated',"nullable":f['nullable'],"type":"string"})
                else:
                    if 'fields' in f['type']:
                        if f['name'] in columns_to_duplicate:
                            walk_schema(f['type'], [n['name'] for n in f['type']['fields']])
                        else:
                            walk_schema(f['type'], columns_to_duplicate)
                    elif 'elementType' in f['type']:
                        if type (f['type']['elementType']) != dict:
                            if f['name'] in columns_to_duplicate:
                                schema['fields'].append({"metadata":{},"name":f['name']+'_duplicated',"nullable":f['nullable'],"type":{"containsNull":True,"elementType":"string","type":"array"}})
                        else:
                            if f['name'] in columns_to_duplicate:
                                walk_schema(f['type']['elementType'], [n['name'] for n in f['type']['elementType']['fields']])
                            else:
                                walk_schema(f['type']['elementType'], columns_to_duplicate)
            except:
                print('error')
    
    walk_schema(schema_dict, columns_to_duplicate)
    
    new_schema = StructType.fromJson(json.loads(json.dumps(schema_dict)))
    

    【讨论】:

      猜你喜欢
      • 2013-03-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-08-05
      • 2017-10-07
      • 2011-11-08
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多