【问题标题】:Add an empty column to Spark DataFrame向 Spark DataFrame 添加一个空列
【发布时间】:2016-01-07 10:15:06
【问题描述】:

正如网络上的manyother locations 中所述,向现有 DataFrame 添加新列并不简单。不幸的是,拥有此功能很重要(即使它在分布式环境中效率低下),尤其是在尝试使用 unionAll 连接两个 DataFrames 时。

null 列添加到DataFrame 以促进unionAll 的最优雅的解决方法是什么?

我的版本是这样的:

from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction
to_none = UserDefinedFunction(lambda x: None, StringType())
new_df = old_df.withColumn('new_column', to_none(df_old['any_col_from_old']))

【问题讨论】:

    标签: python apache-spark dataframe pyspark apache-spark-sql


    【解决方案1】:

    这里只需要一个字面量和演员表:

    from pyspark.sql.functions import lit
    
    new_df = old_df.withColumn('new_column', lit(None).cast(StringType()))
    

    一个完整的例子:

    df = sc.parallelize([row(1, "2"), row(2, "3")]).toDF()
    df.printSchema()
    
    ## root
    ##  |-- foo: long (nullable = true)
    ##  |-- bar: string (nullable = true)
    
    new_df = df.withColumn('new_column', lit(None).cast(StringType()))
    new_df.printSchema()
    
    ## root
    ##  |-- foo: long (nullable = true)
    ##  |-- bar: string (nullable = true)
    ##  |-- new_column: string (nullable = true)
    
    new_df.show()
    
    ## +---+---+----------+
    ## |foo|bar|new_column|
    ## +---+---+----------+
    ## |  1|  2|      null|
    ## |  2|  3|      null|
    ## +---+---+----------+
    

    可在此处找到等效的 Scala:Create new Dataframe with empty/null field values

    【讨论】:

    • 如果该列首先不存在,如何有条件地执行此操作?我正在尝试使用 UDF 并将 DF 传递给它,然后进行new_column not in df.columns 检查,但无法使其工作。
    • 我也看过它,但我仍然无法有条件地将其合并到 withColumn('blah', where(has_column(df['blah']) == False).... 类型的构造中。必须缺少一些语法结构。如果不存在,我想添加一个带有 Null 的列。这个答案是前者,另一个检查后者。
    • @Gopala df if has_column(df) else df.withColumn(....) - 与 Spark 无关。
    • 该死....我真的很困惑 python 语法何时有效,何时无效。例如,withColumn 中不能有条件代码,而必须使用 UDF。谢谢!
    【解决方案2】:

    我会将 lit(None) 转换为 NullType 而不是 StringType。因此,如果我们必须过滤掉该列上的非空行......它可以很容易地完成,如下所示

    df = sc.parallelize([Row(1, "2"), Row(2, "3")]).toDF()
    
    new_df = df.withColumn('new_column', lit(None).cast(NullType()))
    
    new_df.printSchema() 
    
    df_null = new_df.filter(col("new_column").isNull()).show()
    df_non_null = new_df.filter(col("new_column").isNotNull()).show()
    

    如果要转换为 StringType,请注意不要使用 lit("None")(with quotes),因为它会在 col("new_column") 上搜索过滤条件为 .isNull() 的记录时失败。

    【讨论】:

    • 错误:Parquet data source does not support null data type.;StringType() 工作。
    【解决方案3】:

    没有import StringType的选项

    df = df.withColumn('foo', F.lit(None).cast('string'))
    

    完整示例:

    from pyspark.sql import SparkSession, functions as F
    
    spark = SparkSession.builder.getOrCreate()
    
    df = spark.range(1, 3).toDF('c')
    df = df.withColumn('foo', F.lit(None).cast('string'))
    
    df.printSchema()
    #     root
    #      |-- c: long (nullable = false)
    #      |-- foo: string (nullable = true)
    
    df.show()
    #     +---+----+
    #     |  c| foo|
    #     +---+----+
    #     |  1|null|
    #     |  2|null|
    #     +---+----+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-04-18
      • 2015-09-04
      • 2018-08-20
      • 2016-08-27
      • 2020-05-07
      • 2016-02-14
      相关资源
      最近更新 更多