【问题标题】:Creating UDF compatible with DataFrame and SQL API创建与 DataFrame 和 SQL API 兼容的 UDF
【发布时间】:2016-11-08 21:44:38
【问题描述】:

我正在尝试编写可以在 Spark SQL 中的 Dataframes 中工作的 UDF。

这里是代码

def Timeformat (timecol1: Int) = {
    if (timecol1 >= 1440)  
        ("%02d:%02d".format((timecol1-1440)/60, (timecol1-1440)%60))  
    else 
        ("%02d:%02d".format((timecol1)/60, (timecol1)%60))
}

sqlContext.udf.register("Timeformat", Timeformat _)

此方法非常适用于 sqlcontext

val dataa = sqlContext.sql("""select Timeformat(abc.time_band) from abc""")

使用 DF - 获取错误 val fcstdataa = abc.select(Timeformat(abc("time_band_start")))

此方法会引发类型不匹配错误。

<console>:41: error: type mismatch;
 found   : org.apache.spark.sql.Column
 required: Int

当我如下重写 UDF 时,它非常适合 DF,但在 Sqlcontext 中不起作用。有没有办法解决这个问题而不创建多个 UDF 来做同样的事情

val Timeformat = udf((timecol1: Int) => 
    if (timecol1 >= 1440)  
        ("%02d:%02d".format((timecol1-1440)/60, (timecol1-1440)%60))  
    else 
        ("%02d:%02d".format((timecol1)/60, (timecol1)%60))
)

我对 scala 和 spark 很陌生,两个声明之间有什么区别。一种方法比另一种更好吗?

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql user-defined-functions


    【解决方案1】:

    在这里使用 UDF 并没有什么意义,但如果你真的想要这个,就不要使用匿名函数。获取你已经拥有的函数(Int =&gt; String)并使用 UDF 包装它:

    def Timeformat(timecol1: Int): String = ??? 
    sqlContext.udf.register("Timeformat", Timeformat _)
    val timeformat_ = udf(Timeformat _)
    

    您也可以callUDF:

    import org.apache.spark.sql.functions.callUDF
    
    abc.select(callUDF("Timeformat", $"time_band_start"))
    

    也就是说,大多数时候应该首选非 UDF 解决方案:

    import org.apache.spark.sql.Column
    import org.apache.spark.sql.functions.{when, format_string}
    
    def timeformatExpr(col: Column) = {
      val offset = when(col >= 1440, 1440).otherwise(0)
      val x = ((col - offset) / 60).cast("int")
      val y = (col - offset) % 60
      format_string("%02d:%02d", x, y)
    }
    

    相当于下面的SQL:

    val expr = """CASE
      WHEN time_band >= 1440 THEN
          FORMAT_STRING(
              '%02d:%02d', 
              CAST((time_band - 1440) / 60 AS INT),
              (time_band - 1440) % 60
          )
      ELSE 
          FORMAT_STRING(
              '%02d:%02d', 
              CAST(time_band / 60 AS INT),
              time_band % 60
          )
    END"""
    

    可用于原始 SQL 以及带有 selectExprexpr 函数的 DataFrame

    示例

    val df = Seq((1L, 123123), (2L, 42132), (3L, 99)).toDF("id", "time_band")
    
    df.select($"*", timeformatExpr($"time_band").alias("tf")).show
    // +---+---------+-------+
    // | id|time_band|     tf|
    // +---+---------+-------+
    // |  1|   123123|2028:03|
    // |  2|    42132| 678:12|
    // |  3|       99|  01:39|
    // +---+---------+-------+
    
    df.registerTempTable("df")
    
    sqlContext.sql(s"SELECT *, $expr AS tf FROM df").show
    // +---+---------+-------+
    // | id|time_band|     tf|
    // +---+---------+-------+
    // |  1|   123123|2028:03|
    // |  2|    42132| 678:12|
    // |  3|       99|  01:39|
    // +---+---------+-------+
    
    df.selectExpr("*", s"$expr AS tf").show
    // +---+---------+-------+
    // | id|time_band|     tf|
    // +---+---------+-------+
    // |  1|   123123|2028:03|
    // |  2|    42132| 678:12|
    // |  3|       99|  01:39|
    // +---+---------+-------+
    

    【讨论】:

      猜你喜欢
      • 2017-04-22
      • 1970-01-01
      • 2016-12-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-07-22
      • 1970-01-01
      相关资源
      最近更新 更多