【问题标题】:Spark SQL - Scala - Aggregate Function as Parameter to Create DF ColumnSpark SQL - Scala - 聚合函数作为创建 DF 列的参数
【发布时间】:2021-04-13 13:02:16
【问题描述】:

我正在尝试创建一个作为主要参数传递的函数:

  • 一个数据帧
  • 另一个函数(聚合:count、countDistinct、max 等)

我的目标是根据提供的函数返回一个带有新列的 DataFrame。

不过,我在打字时遇到了麻烦。我一直在这里搜索,我发现的大部分内容都指向 UDF,并且需要创建它以便在“withColumn”中应用它。

当我运行这样的事情时:

    val DF1 = Seq(
  ("asd", "1", "search", "otpx"),
  ("asd", "1", "xpto", "otpx"),
  ("asd", "2", "xpto", "otpx"),
  ("asd", "3", "xpto", "otpx"),
  ("asd", "3", "search", "otpx"),
  ("asd", "4", "search", "otpx"),

  ("zxc", "1", "search", "otpx"),
  ("zxc", "1", "search", "otpx"),
  ("zxc", "1", "search", "otpx"),
  ("zxc", "1", "search", "otpx"),
  ("zxc", "2", "xpto", "otpx"),
  ("zxc", "3", "xpto", "otpx"),
  ("zxc", "3", "xpto", "otpx"),
  ("zxc", "3", "xpto", "otpx"),

  ("qwe", "1", "xpto", "otpx"),
  ("qwe", "2", "xpto", "otpx"),
  ("qwe", "3", "xpto", "otpx"),
  ("qwe", "4", "xpto", "otpx"),
  ("qwe", "5", "xpto", "otpx")

).toDF("cid", "cts", "type", "subtype")

DF1.show(100)

val canList = List("cid", "cts")

def test[T](df: DataFrame, fn: Column => T, newColName: String, colToFn: String, partitionByColumns: List[String]): DataFrame = {

  val window = Window.partitionBy(partitionByColumns.head, partitionByColumns.tail:_*)

  val fun: (Column => T) = (arg: Column) => fn(arg) // or right away udfFun = udf(fn)

  val udfFun = udf(fun)

  val ret = df.withColumn(newColName, udfFun(df(colToFn)).over(window))

  ret
}

val DF2 = test(DF1, countDistinct, "count_type", "type", canList)

DF2.orderBy(canList.head, canList.tail:_*).show(100)

我收到如下错误:

没有可用于 T 的 TypeTag

val udfFun = udf(fun)

我在这里错过了什么?

提前致谢,干杯!

【问题讨论】:

    标签: scala apache-spark apache-spark-sql higher-order-functions


    【解决方案1】:

    首先请注意 countDistinct 不受窗口支持。如果你想定义一个在窗口上接受其他聚合函数的函数(比如count),你可以将fn定义为一个接受一列并返回一列的函数。 UDF 在这里不合适,因为您调用的是 Spark SQL 函数,而不是自定义 Scala 函数。

    def test(df: DataFrame,
             fn: Column => Column,
             newColName: String,
             colToFn: String,
             partitionByColumns: List[String]
    ): DataFrame = {
      val window = Window.partitionBy(partitionByColumns.head, partitionByColumns.tail:_*)
      val ret = df.withColumn(newColName, fn(col(colToFn)).over(window))
      ret
    }
    
    // calling the function
    test(DF1, count, "count_type", "type", canList)
    

    【讨论】:

      猜你喜欢
      • 2021-04-07
      • 1970-01-01
      • 1970-01-01
      • 2016-02-26
      • 1970-01-01
      • 2019-02-22
      • 2017-04-09
      • 2015-10-16
      • 2015-10-15
      相关资源
      最近更新 更多