【问题标题】:Pass a ArrayType column to UDF in Spark Scala在 Spark Scala 中将 ArrayType 列传递给 UDF
【发布时间】:2021-12-06 16:33:41
【问题描述】:

我在 Scala 中的 Spark 数据框中有一列是由于使用聚合多个列而生成的

 agg(collect_list(struct(col(abc), col(aaa)).as(def)

我想将此列传递给 UDF 以进行进一步处理,以处理此聚合列中的一个索引。

当我将参数传递给我的 UDF 时:

.withColumn(def, remove
            (col(xyz), col(def)))

UDF- 类型为 Seq[Row]: val removeUnstableActivations: UserDefinedFunction = udf((xyz: java.util.Date, def: Seq[Row])

我得到错误:

Exception encountered when invoking run on a nested suite - Schema for type org.apache.spark.sql.Row is not supported

我应该如何传递这些列以及UDF中列的数据类型应该是什么?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    确实不支持 Row 类型的架构,但您可以返回一个案例类。 Spark 会将返回的案例类视为 StructType。例如:

    import org.apache.spark.sql.functions._
    import spark.implicits._
    import org.apache.spark.sql.expressions.UserDefinedFunction
    import org.apache.spark.sql.Row
    
    val df = Seq(
      (1, "a"),
      (2, "b"),
      (3, "c")
    ).toDF("number", "word")
    
    val aggDf = df.agg(
      collect_list(struct(col("number"), col("word"))) as "aggColumn"
    )
    
    aggDf.printSchema()
    // |-- aggColumn: array (nullable = true)
    // |    |-- element: struct (containsNull = true)
    // |    |    |-- number: string (nullable = true)
    // |    |    |-- word: integer (nullable = false)
    
    case class ReturnSchema(word: String, number: Int)
    
    val myUdf: UserDefinedFunction =
      udf((collection: Seq[Row]) => {
        collection.map(r => {
          val word   = r.getAs[String]("word")
          val newNumber = r.getAs[Int]("number") * 100
    
          new ReturnSchema(word, newNumber)
        })
      })
      
    val finalDf = aggDf.select(myUdf(col("aggColumn")).as("udfTranformedColumn"))
    
    finalDf.printSchema
    // root
    //  |-- udfTranformedColumn: array (nullable = true)
    //  |    |-- element: struct (containsNull = true)
    //  |    |    |-- word: string (nullable = true)
    //  |    |    |-- number: integer (nullable = false)
    
    finalDf.show(false)
    // +------------------------------+
    // |udfTranformedColumn           |
    // +------------------------------+
    // |[[a, 100], [b, 200], [c, 300]]|
    // +------------------------------+
    

    【讨论】:

      猜你喜欢
      • 2017-12-11
      • 2020-06-16
      • 2017-05-23
      • 1970-01-01
      • 1970-01-01
      • 2018-02-02
      • 1970-01-01
      • 1970-01-01
      • 2017-08-01
      相关资源
      最近更新 更多