【问题标题】:Rename nested field in spark dataframe重命名火花数据框中的嵌套字段
【发布时间】:2017-08-17 16:58:58
【问题描述】:

在 Spark 中有一个数据框 df

 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

如何将字段array_field.a 重命名为array_field.a_renamed

[更新]:

.withColumnRenamed() 不适用于嵌套字段,所以我尝试了这种 hacky 且不安全的方法:

# First alter the schema:
schema = df.schema
schema['array_field'].dataType.elementType['a'].name = 'a_renamed'

ind = schema['array_field'].dataType.elementType.names.index('a')
schema['array_field'].dataType.elementType.names[ind] = 'a_renamed'

# Then set dataframe's schema with altered schema
df._schema = schema

我知道设置私有属性不是一个好习惯,但我不知道为 df 设置架构的其他方法

我认为我在正确的轨道上,但 df.printSchema() 仍然显示 array_field.a 的旧名称,虽然 df.schema == schemaTrue

【问题讨论】:

    标签: python apache-spark dataframe pyspark rename


    【解决方案1】:

    Python

    无法修改单个嵌套字段。你必须重新创建一个完整的结构。在这种特殊情况下,最简单的解决方案是使用cast

    首先是一堆导入:

    from collections import namedtuple
    from pyspark.sql.functions import col
    from pyspark.sql.types import (
        ArrayType, LongType, StringType, StructField, StructType)
    

    和示例数据:

    Record = namedtuple("Record", ["a", "b", "c"])
    
    df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])
    

    让我们确认架构与您的情况相同:

    df.printSchema()
    
    root
     |-- array_field: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- a: string (nullable = true)
     |    |    |-- b: long (nullable = true)
     |    |    |-- c: long (nullable = true)
    

    您可以将新架构定义为例如字符串:

    str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"
    
    df.select(col("array_field").cast(str_schema)).printSchema()
    
    root
     |-- array_field: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- a_renamed: string (nullable = true)
     |    |    |-- b: long (nullable = true)
     |    |    |-- c: long (nullable = true)
    

    DataType:

    struct_schema = ArrayType(StructType([
        StructField("a_renamed", StringType()),
        StructField("b", LongType()),
        StructField("c", LongType())
    ]))
    
     df.select(col("array_field").cast(struct_schema)).printSchema()
    
    root
     |-- array_field: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- a_renamed: string (nullable = true)
     |    |    |-- b: long (nullable = true)
     |    |    |-- c: long (nullable = true)
    

    斯卡拉

    同样的技术也可以在 Scala 中使用:

    case class Record(a: String, b: Long, c: Long)
    
    val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field")
    
    val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"
    
    df.select($"array_field".cast(strSchema))
    

    import org.apache.spark.sql.types._
    
    val structSchema = ArrayType(StructType(Seq(
        StructField("a_renamed", StringType),
        StructField("b", LongType),
        StructField("c", LongType)
    )))
    
    df.select($"array_field".cast(structSchema))
    

    可能的改进

    如果您使用表达性数据操作或 JSON 处理库,则可能更容易将数据类型转储到 dict 或 JSON 字符串并从那里获取例如 (Python / toolz):

    from toolz.curried import pipe, assoc_in, update_in, map
    from operator import attrgetter
    
    # Update name to "a_updated" if name is "a"
    rename_field = update_in(
        keys=["name"], func=lambda x: "a_updated" if x == "a" else x)
    
    updated_schema = pipe(
       #  Get schema of the field as a dict
       df.schema["array_field"].jsonValue(),
       # Update fields with rename
       update_in(
           keys=["type", "elementType", "fields"],
           func=lambda x: pipe(x, map(rename_field), list)),
       # Load schema from dict
       StructField.fromJson,
       # Get data type
       attrgetter("dataType"))
    
    df.select(col("array_field").cast(updated_schema)).printSchema()
    

    【讨论】:

    • 这里str_schema = "array&lt;struct&lt;a_renamed:string,b:bigint,c:bigint&gt;&gt;",当你知道架构时,这是可行的,但当我不知道架构时,我们如何用特定字符串替换所有名称,例如a_renamed, b_renamed, c_renamed 等等
    • @User12345 Spark 在运行时总是“知道”模式。如果您不想/不能硬编码DataType/DDL 字符串,则必须动态转换现有模式。此类方法的示例在最后一段中显示,但您可以找到其他产生相同结果的方法。
    • 谢谢。我试图让我的 map 函数,例如rdd.map(my_function) 返回一个结构,如{"field1": str(row.column1), "field2":float(row.column2), "field3":list(row.column3)},spark 会将返回类型转换为map 类型并破坏一些值。使用命名元组我能够得到我想要的。谢谢:)
    【解决方案2】:

    您可以递归数据框的架构以创建具有所需更改的新架构。

    PySpark 中的模式是一个 StructType,它包含一个 StructField 列表,每个 StructField 可以包含一些原始类型或另一个 StructType。

    这意味着我们可以根据类型是否为 StructType 来决定是否要递归。

    下面是一个带注释的示例实现,向您展示了如何实现上述想法。

    # Some imports
    from pyspark.sql.types import DataType, StructType, ArrayType
    from copy import copy
    
    # We take a dataframe and return a new one with required changes
    def cleanDataFrame(df: DataFrame) -> DataFrame:
        # Returns a new sanitized field name (this function can be anything really)
        def sanitizeFieldName(s: str) -> str:
            return s.replace("-", "_").replace("&", "_").replace("\"", "_")\
                .replace("[", "_").replace("]", "_").replace(".", "_")
        
        # We call this on all fields to create a copy and to perform any 
        # changes we might want to do to the field.
        def sanitizeField(field: StructField) -> StructField:
            field = copy(field)
            field.name = sanitizeFieldName(field.name)
            # We recursively call cleanSchema on all types
            field.dataType = cleanSchema(field.dataType)
            return field
        
        def cleanSchema(dataType: [DataType]) -> [DataType]:
            dataType = copy(dataType)
            # If the type is a StructType we need to recurse otherwise 
            # we can return since we've reached the leaf node
            if isinstance(dataType, StructType):
                # We call our sanitizer for all top level fields
                dataType.fields = [sanitizeField(f) for f in dataType.fields]
            elif isinstance(dataType, ArrayType):
                dataType.elementType = cleanSchema(dataType.elementType)
            return dataType
    
        # Now since we have the new schema we can create a new DataFrame 
        # by using the old Frame's RDD as data and the new schema as the 
        # schema for the data
        return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
    

    【讨论】:

    • 感谢您的回答!拯救了我的一天。我可以建议避免使用“import *”,而是使用“from pyspark.sql.types import DataType, StructType, ArrayType” - 这可能是版本问题,但“from pyspark.sql import *”不起作用,因为使用的类型包在子包“类型”中
    • 嗨 Ashhar,这是一段很棒的代码,感谢您发布它。我试图在每行可能具有略微不同结构的框架上使用它,因此我认为使用初始 DataFrame 的 RDD 是一个问题。我看到我的结构键递归小写(我的用例),但值设置为空。你能解释一下为什么会这样吗?
    【解决方案3】:

    我发现了一种比@zero323 提供的方法更简单的方法,大致如下 @MaxPY 的:

    Pyspark 2.4:

    # Get the schema from the dataframe df
    schema = df.schema
    
    # Override `fields` with a list of new StructField, equals to the previous but for the names
    schema.fields = (list(map(lambda field: 
                              StructField(field.name + "_renamed", field.dataType), schema.fields)))
    
    # Override also `names` with the same mechanism
    schema.names = list(map(lambda name: name + "_renamed", table_schema.names))
    

    现在df.schema 将打印所有更新的名称。

    【讨论】:

      【解决方案4】:

      另一个更简单的解决方案,如果它对你有用,对我也有用,那就是展平结构,然后重命名:

      使用 Scala:

      val df_flat = df.selectExpr("array_field.*")
      

      现在重命名有效

      val df_renamed = df_flat.withColumnRenamed("a", "a_renamed")
      

      当然,这只适用于你不需要层次结构的情况(尽管我想如果需要它可以再次重新创建)

      【讨论】:

        【解决方案5】:

        使用 Leo Chttps://stackoverflow.com/a/55363153/5475506 中提供的答案,我构建了我认为更人性化的/pythoniac 脚本:

            import pyspark.sql.types as sql_types
        
            path_table = "<PATH_TO_DATA>"
            table_name = "<TABLE_NAME>"
        
            def recur_rename(schema: StructType, old_char, new_char):
                schema_new = []
                for struct_field in schema:
                    if type(struct_field.dataType)==sql_types.StructType:
                        schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), sql_types.StructType(recur_rename(struct_field.dataType, old_char, new_char)), struct_field.nullable, struct_field.metadata))
                    elif type(struct_field.dataType)==sql_types.ArrayType: 
                        if type(struct_field.dataType.elementType)==sql_types.StructType:
                            schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), sql_types.ArrayType(sql_types.StructType(recur_rename(struct_field.dataType.elementType, old_char, new_char)),True), struct_field.nullable, struct_field.metadata)) # Recursive call to loop over all Array elements
                        else:
                            schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), struct_field.dataType.elementType, struct_field.nullable, struct_field.metadata)) # If ArrayType only has one field, it is no sense to use an Array so Array is exploded
                    else:
                        schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), struct_field.dataType, struct_field.nullable, struct_field.metadata))
                return schema_new
        
            def rename_columns(schema: StructType, old_char, new_char):
                return sql_types.StructType(recur_rename(schema, old_char, new_char))
        
            df = spark.read.format("json").load(path_table) # Read data whose schema has to be changed.
            newSchema = rename_columns(df.schema, ":", "_") # Replace special characters in schema (More special characters not allowed in Spark/Hive meastore: ':', ',', ';')
            df2= spark.read.format("json").schema(newSchema).load(path_table) # Read data with new schema.
        

        我认为代码是自我解释的(此外,它有 cmets),但它所做的是递归循环模式中的所有字段,将每个字段中的“old_char”替换为“new_char”。如果字段类型是嵌套的(StructType 或 ArrayType),则进行新的递归调用。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2023-04-03
          • 2016-12-29
          • 1970-01-01
          • 1970-01-01
          • 2020-07-21
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多