【问题标题】:Spark withColumn - add column using non-Column type variable [duplicate]Spark withColumn - 使用非列类型变量添加列
【发布时间】:2024-04-26 06:20:01
【问题描述】:

如何从变量值向数据框中添加列?

我知道我可以使用.toDF(colName) 创建一个数据框,而.withColumn 是添加列的方法。但是,当我尝试以下操作时,出现类型不匹配错误:

val myList = List(1,2,3)
val myArray = Array(1,2,3)

myList.toDF("myList")
  .withColumn("myArray", myArray)

类型不匹配,预期:Column,实际:Array[Int]

此编译错误出现在.withColumn 调用中的myArray 上。如何将其从 Array[Int] 转换为 Column 类型?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    不确定withColumn 是您真正想要的。您可以应用lit() 使 myArray 符合方法规范,但结果将是 DataFrame 中每一行的相同数组值:

    myList.toDF("myList").withColumn("myArray", lit(myArray)).
      show
    // +------+---------+
    // |myList|  myArray|
    // +------+---------+
    // |     1|[1, 2, 3]|
    // |     2|[1, 2, 3]|
    // |     3|[1, 2, 3]|
    // +------+---------+
    

    如果您尝试按列合并两个集合,则它与withColumn 提供的转换不同。在这种情况下,您需要将它们中的每一个都转换为 DataFrame 并通过 join 组合它们。

    现在,如果两个集合的元素是行标识的并且像您的示例中那样成对地相互匹配,并且您想以这种方式加入它们,您可以简单地加入转换后的 DataFrame:

    myList.toDF("myList").join(
        myArray.toSeq.toDF("myArray"), $"myList" === $"myArray"
      ).show
    // +------+-------+
    // |myList|myArray|
    // +------+-------+
    // |     1|      1|
    // |     2|      2|
    // |     3|      3|
    // +------+-------+
    

    但如果这两个集合包含不可连接的元素,而您只想按列合并它们,则需要使用两个数据框中兼容的行标识列来连接它们。如果没有这样的行标识列,一种方法是创建自己的rowIds,如下例所示:

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    
    val df1 = List("a", "b", "c").toDF("myList")
    val df2 = Array("x", "y", "z").toSeq.toDF("myArray")
    
    val rdd1 = df1.rdd.zipWithIndex.map{
      case (row: Row, id: Long) => Row.fromSeq(row.toSeq :+ id)
    }
    val df1withId = spark.createDataFrame( rdd1,
      StructType(df1.schema.fields :+ StructField("rowId", LongType, false))
    )
    
    val rdd2 = df2.rdd.zipWithIndex.map{
      case (row: Row, id: Long) => Row.fromSeq(row.toSeq :+ id)
    }
    val df2withId = spark.createDataFrame( rdd2, 
      StructType(df2.schema.fields :+ StructField("rowId", LongType, false))
    )
    
    df1withId.join(df2withId, Seq("rowId")).show
    // +-----+------+-------+
    // |rowId|myList|myArray|
    // +-----+------+-------+
    // |    0|     a|      x|
    // |    1|     b|      y|
    // |    2|     c|      z|
    // +-----+------+-------+
    

    【讨论】:

      【解决方案2】:

      错误信息有确切的内容,您需要输入一列(或lit())作为withColumn()的第二个参数

      试试这个

      import org.apache.spark.sql.functions.typedLit
      
      val myList = List(1,2,3)
      val myArray = Array(1,2,3)
      
      myList.toDF("myList")
        .withColumn("myArray", typedLit(myArray))
      

      :)

      【讨论】: