【问题标题】:How do I add a column to a nested struct in a pyspark dataframe?如何将列添加到 pyspark 数据框中的嵌套结构?
【发布时间】:2018-07-24 11:36:12
【问题描述】:

我有一个具有类似架构的数据框

root
 |-- state: struct (nullable = true)
 |    |-- fld: integer (nullable = true)

我想在 state 结构中添加列,即创建一个具有类似架构的数据框

root
 |-- state: struct (nullable = true)
 |    |-- fld: integer (nullable = true)
 |    |-- a: integer (nullable = true)

但是我得到了

root
 |-- state: struct (nullable = true)
 |    |-- fld: integer (nullable = true)
 |-- state.a: integer (nullable = true)

这是尝试

df.withColumn('state.a', val)

【问题讨论】:

  • 您可以使用具有所需架构的 udf 创建一个新列,然后删除旧列。据我所知,您无法更改 struct 列的架构。 see this question

标签: apache-spark pyspark apache-spark-sql


【解决方案1】:

这是一种不使用udf 的方法:

# create example dataframe
import pyspark.sql.functions as f
data = [
    ({'fld': 0},)
]

schema = StructType(
    [
        StructField('state',
            StructType(
                [StructField('fld', IntegerType())]
            )
        )
    ]
)

df = sqlCtx.createDataFrame(data, schema)
df.printSchema()
#root
# |-- state: struct (nullable = true)
# |    |-- fld: integer (nullable = true)

现在使用withColumn() 并使用lit()alias() 添加新字段。

val = 1
df_new = df.withColumn(
    'state', 
    f.struct(*[f.col('state')['fld'].alias('fld'), f.lit(val).alias('a')])
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# |    |-- fld: integer (nullable = true)
# |    |-- a: integer (nullable = false)

如果嵌套结构中有很多字段,则可以使用列表推导式,使用df.schema["state"].dataType.names 来获取字段名称。例如:

val = 1
s_fields = df.schema["state"].dataType.names # ['fld']
df_new = df.withColumn(
    'state', 
    f.struct(*([f.col('state')[c].alias(c) for c in s_fields] + [f.lit(val).alias('a')]))
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# |    |-- fld: integer (nullable = true)
# |    |-- a: integer (nullable = false)

参考文献

  • 我找到了一种从结构中获取字段名称的方法,而无需从 this answer 手动命名。

【讨论】:

  • 我明白了,使用withColumnstruct 替换为新结构,因此复制旧字段。这行得通,谢谢!我想知道是否有一种方法可以将字段添加到结构中,而不必命名所有现有的子字段?
  • @MrCartoonology 我找到了一种更简洁的方法来获取字段名称。查看更新。
【解决方案2】:

虽然这是一个为时已晚的答案,但对于 pyspark 版本 2.x.x 以下是支持的。

假设dfOld 已经包含statefld 所问的问题。

dfOld.withColumn("a","value") dfNew = dfOld.select("level1Field1", "level1Field2", struct(col("state.fld").alias("fld"), col("a")).alias("state"))

参考:https://medium.com/@mrpowers/adding-structtype-columns-to-spark-dataframes-b44125409803

【讨论】:

    【解决方案3】:

    使用如下转换:

    import pyspark.sql.functions as f
    
    df = df.withColumn(
        "state",
        f.struct(
            f.col("state.*"),
            f.lit(123).alias("a")
        )
    )
    

    【讨论】:

      【解决方案4】:

      这是一种方法没有 udf。

      初始化示例数据框:

      nested_df1 = (spark.read.json(sc.parallelize(["""[
              { "state": {"fld": 1} },
              { "state": {"fld": 2}}
          ]"""])))
      
      nested_df1.printSchema()
      
      root
       |-- state: struct (nullable = true)
       |    |-- fld: long (nullable = true)
      

      默认情况下,Spark .read.json 将所有整数导入为 long。 如果state.fld 必须是int,则需要强制转换它。

      from pyspark.sql import functions as F
      
      nested_df1 = (nested_df1
          .select( F.struct(F.col("state.fld").alias("fld").cast('int')).alias("state") ))
      
      nested_df1.printSchema()
      
      root
       |-- state: struct (nullable = false)
       |    |-- col1: integer (nullable = true)
      
      nested_df1.show()
      
      +-----+
      |state|
      +-----+
      |  [1]|
      |  [2]|
      +-----+
      

      终于

      使用.select 使用"parent.child" 表示法从现有结构中获取您想要的嵌套列,创建新列,然后将旧列与新列一起重新包装在struct 中。

      val_a = 3
      
      nested_df2 = (nested_df
          .select( 
              F.struct(
                  F.col("state.fld"), 
                  F.lit(val_a).alias("a")
              ).alias("state")
          )
      )
      
      
      nested_df2.printSchema()
      
      root
       |-- state: struct (nullable = false)
       |    |-- fld: integer (nullable = true)
       |    |-- a: integer (nullable = false)
      
      nested_df2.show()
      
      +------+
      | state|
      +------+
      |[1, 3]|
      |[2, 3]|
      +------+
      

      如果需要,使用"parent.*" 进行展平。

      nested_df2.select("state.*").printSchema()
      
      root
       |-- fld: integer (nullable = true)
       |-- a: integer (nullable = false)
      
      nested_df2.select("state.*").show()
      
      +---+---+
      |fld|  a|
      +---+---+
      |  1|  3|
      |  2|  3|
      +---+---+
      

      【讨论】:

        【解决方案5】:
        from pyspark.sql.functions import *
        from pyspark.sql.types import *
        def add_field_in_dataframe(nfield, df, dt): 
            fields = nfield.split(".")
            print fields
            n = len(fields)
            addField = fields[0]  
            if n == 1:
                return df.withColumn(addField, lit(None).cast(dt))
        
            nestedField = ".".join(fields[:-1])
            sfields = df.select(nestedField).schema[fields[-2]].dataType.names
            print sfields
            ac = col(nestedField)
            if n == 2:
                nc = struct(*( [ac[c].alias(c) for c in sfields] + [lit(None).cast(dt).alias(fields[-1])]))
            else:
                nc = struct(*( [ac[c].alias(c) for c in sfields] + [lit(None).cast(dt).alias(fields[-1])])).alias(fields[-2])
            print nc
            n = n - 1
        
            while n > 1: 
                print "n: ",n
                fields = fields[:-1]
                print "fields: ", fields
                nestedField = ".".join(fields[:-1])
                print "nestedField: ", nestedField
                sfields = df.select(nestedField).schema[fields[-2]].dataType.names
                print fields[-1]
                print "sfields: ", sfields
                sfields = [s for s in sfields if s != fields[-1]]
                print "sfields: ", sfields
                ac = col(".".join(fields[:-1]))
                if n > 2: 
                    print fields[-2]
                    nc = struct(*( [ac[c].alias(c) for c in sfields] + [nc])).alias(fields[-2])
                else:
                    nc = struct(*( [ac[c].alias(c) for c in sfields] + [nc]))
                n = n - 1
            return df.withColumn(addField, nc)
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2021-08-26
          • 2023-01-26
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多