【问题标题】:Define a Spark scala UDF with Option as input param使用 Option 作为输入参数定义 Spark scala UDF
【发布时间】:2019-01-16 11:20:40
【问题描述】:

编写以下 UDF 旨在使其处理未定义参数的情况。 下面是代码:

val addTimeFromCols: UserDefinedFunction = udf((year: String, month: String, day: String, hour: String) => {
      Option(hour) match {
        case None    => (List(year, month, day).mkString(DASH_SEP)).concat(SPACE).concat(defaultHour)
        case Some(x) => (List(year, month, day).mkString(DASH_SEP)).concat(SPACE).concat(hour)
      }
    })

 def addTimestampFromFileCols(): DataFrame = df
  .withColumn(COLUMN_TS, addTimeFromCols(col(COLUMN_YEAR), col(COLUMN_MONTH), col(COLUMN_DAY), col(COLUMN_HOUR)).cast(TimestampType))

我的目标是使此功能适用于所有用例(具有 HOUR 列的数据框和其他没有此列的数据框,在这种情况下,我默认定义一个值。不幸的是,当我再次测试没有列的数据框我收到以下错误:

cannot resolve '`HOUR`' given input columns

请知道如何解决这个问题

【问题讨论】:

    标签: scala apache-spark user-defined-functions option


    【解决方案1】:

    如果该列不存在,则必须通过 lit() 函数提供默认值,否则会抛出错误。以下对我有用

    scala> defaultHour
    res77: String = 00
    
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    def addTimestampFromFileCols(df:DataFrame) =
    {
    val hr = if( df.columns.contains("hour") ) col(COLUMN_HOUR) else lit(defaultHour)
    df.withColumn(COLUMN_TS, addTimeFromCols(col(COLUMN_YEAR), col(COLUMN_MONTH), col(COLUMN_DAY), hr).cast(TimestampType))
    }
    
    // Exiting paste mode, now interpreting.
    
    addTimestampFromFileCols: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
    
    scala> 
    

    +ve 案例

    scala> val df = Seq(("2019","01","10","09")).toDF("year","month","day","hour")
    df: org.apache.spark.sql.DataFrame = [year: string, month: string ... 2 more fields]
    
    scala> addTimestampFromFileCols(df).show(false)
    +----+-----+---+----+-------------------+
    |year|month|day|hour|tstamp             |
    +----+-----+---+----+-------------------+
    |2019|01   |10 |09  |2019-01-10 09:00:00|
    +----+-----+---+----+-------------------+
    

    -ve 案例

    scala> val df = Seq(("2019","01","10")).toDF("year","month","day")
    df: org.apache.spark.sql.DataFrame = [year: string, month: string ... 1 more field]
    
    scala> addTimestampFromFileCols(df).show(false)
    +----+-----+---+-------------------+
    |year|month|day|tstamp             |
    +----+-----+---+-------------------+
    |2019|01   |10 |2019-01-10 00:00:00|
    +----+-----+---+-------------------+
    
    scala>
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-04-04
      • 2019-05-21
      • 2018-07-16
      • 2017-04-30
      • 2017-11-01
      • 2021-02-23
      • 2016-12-02
      • 2017-05-23
      相关资源
      最近更新 更多