【问题标题】:How do I add a new column to a Spark DataFrame (using PySpark)?如何向 Spark DataFrame 添加新列(使用 PySpark)?
【发布时间】:2016-02-14 08:53:12
【问题描述】:

我有一个 Spark DataFrame(使用 PySpark 1.5.1)并想添加一个新列。

我尝试了以下方法但没有成功:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])

使用这个也有错误:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

那么如何使用 PySpark 向现有 DataFrame 添加新列(基于 Python 向量)?

【问题讨论】:

    标签: python apache-spark dataframe pyspark apache-spark-sql


    【解决方案1】:

    使用一些自定义值或动态值计算添加新列,这些值将基于现有列填充。

    例如

    |ColumnA | ColumnB |
    |--------|---------|
    | 10     | 15      |
    | 10     | 20      |
    | 10     | 30      |
    

    和新的 ColumnC 作为 ColumnA+ColumnB

    |ColumnA | ColumnB | ColumnC|
    |--------|---------|--------|
    | 10     | 15      | 25     |
    | 10     | 20      | 30     |
    | 10     | 30      | 40     |
    

    使用

    #to add new column
    def customColumnVal(row):
    rd=row.asDict()
    rd["ColumnC"]=row["ColumnA"] + row["ColumnB"]
    
    new_row=Row(**rd)
    return new_row
    ----------------------------
    #convert DF to RDD
    df_rdd= input_dataframe.rdd
    
    #apply new fucntion to rdd
    output_dataframe=df_rdd.map(customColumnVal).toDF()
    

    input_dataframe 是要修改的数据框,customColumnVal 函数有代码来添加新列。

    【讨论】:

      【解决方案2】:

      我们可以通过多种方式在 pySpark 中添加新列。

      让我们首先创建一个简单的 DataFrame。

      date = [27, 28, 29, None, 30, 31]
      df = spark.createDataFrame(date, IntegerType())
      

      现在让我们尝试将列值加倍并将其存储在新列中。 PFB 有几种不同的方法可以实现相同的目标。

      # Approach - 1 : using withColumn function
      df.withColumn("double", df.value * 2).show()
      
      # Approach - 2 : using select with alias function.
      df.select("*", (df.value * 2).alias("double")).show()
      
      # Approach - 3 : using selectExpr function with as clause.
      df.selectExpr("*", "value * 2 as double").show()
      
      # Approach - 4 : Using as clause in SQL statement.
      df.createTempView("temp")
      spark.sql("select *, value * 2 as double from temp").show()
      

      更多关于spark DataFrame函数的例子和解释,可以访问我的blog

      我希望这会有所帮助。

      【讨论】:

        【解决方案3】:

        我们可以通过以下步骤直接向 DataFrame 添加额外的列:

        from pyspark.sql.functions import when
        df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
        df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
        df.show()
        

        【讨论】:

          【解决方案4】:

          添加列的最简单方法是使用“withColumn”。由于数据框是使用 sqlContext 创建的,因此您必须指定架构或默认情况下可以在数据集中使用。如果指定了模式,那么每次更改时工作负载都会变得乏味。

          以下是您可以考虑的示例:

          from pyspark.sql import SQLContext
          from pyspark.sql.types import *
          sqlContext = SQLContext(sc) # SparkContext will be sc by default 
          
          # Read the dataset of your choice (Already loaded with schema)
          Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")
          
          # For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
          Data = Data.withColumn("col31", "Code goes here")
          
          # Check the change 
          Data.printSchema()
          

          【讨论】:

          • 如果 col31 值是 lke df['keyName'],你会怎么做?
          【解决方案5】:

          我想为一个非常相似的用例提供一个通用示例:

          用例:我有一个包含以下内容的 csv:

          First|Third|Fifth
          data|data|data
          data|data|data
          ...billion more lines
          

          我需要执行一些转换,最终的 csv 需要看起来像

          First|Second|Third|Fourth|Fifth
          data|null|data|null|data
          data|null|data|null|data
          ...billion more lines
          

          我需要这样做,因为这是由某个模型定义的架构,我需要让我的最终数据与 SQL 批量插入和类似的东西互操作。

          所以:

          1) 我使用 spark.read 读取原始 csv 并将其命名为“df”。

          2) 我对数据做一些事情。

          3) 我使用此脚本添加空列:

          outcols = []
          for column in MY_COLUMN_LIST:
              if column in df.columns:
                  outcols.append(column)
              else:
                  outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))
          
          df = df.select(outcols)
          

          通过这种方式,您可以在加载 csv 后构建您的架构(如果您必须对许多表执行此操作,也可以用于重新排序列)。

          【讨论】:

            【解决方案6】:
            from pyspark.sql.functions import udf
            from pyspark.sql.types import *
            func_name = udf(
                lambda val: val, # do sth to val
                StringType()
            )
            df.withColumn('new_col', func_name(df.old_col))
            

            【讨论】:

            • 您需要致电StringType()
            【解决方案7】:

            使用 UDF 添加列:

            df = sqlContext.createDataFrame(
                [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
            
            from pyspark.sql.functions import udf
            from pyspark.sql.types import *
            
            def valueToCategory(value):
               if   value == 1: return 'cat1'
               elif value == 2: return 'cat2'
               ...
               else: return 'n/a'
            
            # NOTE: it seems that calls to udf() must be after SparkContext() is called
            udfValueToCategory = udf(valueToCategory, StringType())
            df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
            df_with_cat.show()
            
            ## +---+---+-----+---------+
            ## | x1| x2|   x3| category|
            ## +---+---+-----+---------+
            ## |  1|  a| 23.0|     cat1|
            ## |  3|  B|-23.0|      n/a|
            ## +---+---+-----+---------+
            

            【讨论】:

              【解决方案8】:

              您不能在 Spark 中将任意列添加到 DataFrame。只能使用字面量创建新列(其他字面量类型在How to add a constant column in a Spark DataFrame? 中描述)

              from pyspark.sql.functions import lit
              
              df = sqlContext.createDataFrame(
                  [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
              
              df_with_x4 = df.withColumn("x4", lit(0))
              df_with_x4.show()
              
              ## +---+---+-----+---+
              ## | x1| x2|   x3| x4|
              ## +---+---+-----+---+
              ## |  1|  a| 23.0|  0|
              ## |  3|  B|-23.0|  0|
              ## +---+---+-----+---+
              

              转换现有列:

              from pyspark.sql.functions import exp
              
              df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
              df_with_x5.show()
              
              ## +---+---+-----+---+--------------------+
              ## | x1| x2|   x3| x4|                  x5|
              ## +---+---+-----+---+--------------------+
              ## |  1|  a| 23.0|  0| 9.744803446248903E9|
              ## |  3|  B|-23.0|  0|1.026187963170189...|
              ## +---+---+-----+---+--------------------+
              

              包含使用join:

              from pyspark.sql.functions import exp
              
              lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
              df_with_x6 = (df_with_x5
                  .join(lookup, col("x1") == col("k"), "leftouter")
                  .drop("k")
                  .withColumnRenamed("v", "x6"))
              
              ## +---+---+-----+---+--------------------+----+
              ## | x1| x2|   x3| x4|                  x5|  x6|
              ## +---+---+-----+---+--------------------+----+
              ## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
              ## |  3|  B|-23.0|  0|1.026187963170189...|null|
              ## +---+---+-----+---+--------------------+----+
              

              或用函数/udf生成:

              from pyspark.sql.functions import rand
              
              df_with_x7 = df_with_x6.withColumn("x7", rand())
              df_with_x7.show()
              
              ## +---+---+-----+---+--------------------+----+-------------------+
              ## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
              ## +---+---+-----+---+--------------------+----+-------------------+
              ## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
              ## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
              ## +---+---+-----+---+--------------------+----+-------------------+
              

              映射到 Catalyst 表达式的性能方面的内置函数 (pyspark.sql.functions) 通常优于 Python 用户定义函数。

              如果您想将任意 RDD 的内容添加为列,您可以

              【讨论】:

              • "只能使用字面量创建新列" 在这种情况下字面量究竟是什么意思?
              • Spark 的文档很棒,请参阅 df.withColumn spark.apache.org/docs/2.1.0/api/python/…
              • Spark 文档之所以“很棒”,只是因为它为精明的读者留下了大量的用法。 Spark(和 Pyspark)涵盖了一个名副其实的数据结构动物园,很少或根本没有关于如何在它们之间进行转换的说明。恰当的例子:像这样的问题激增。
              【解决方案9】:

              对于Spark 2.0

              # assumes schema has 'age' column 
              df.select('*', (df.age + 10).alias('agePlusTen'))
              

              【讨论】:

              • 需要为df.select('*', (df.age + 10).alias('agePlusTen'))
              • 谢谢,如果您输入df = df.select('*', (df.age + 10).alias('agePlusTen')),您实际上是在添加任意列,因为@zero323 在上面警告我们是不可能的,除非在Spark 中这样做有问题,在 Pandas 中,这是标准方式..
              • pySpark 有这个版本吗?
              • @Tagar 以上 sn-p 是 python。
              • @GeoffreyAnderson, df.select('*', df.age + 10, df.age + 20)
              【解决方案10】:

              您可以在添加column_name时定义一个新的udf

              u_f = F.udf(lambda :yourstring,StringType())
              a.select(u_f().alias('column_name')
              

              【讨论】:

                猜你喜欢
                • 2016-08-27
                • 1970-01-01
                • 2018-09-24
                • 2018-06-30
                • 1970-01-01
                • 1970-01-01
                • 1970-01-01
                • 1970-01-01
                相关资源
                最近更新 更多