【问题标题】:Update date format in spark dataframe for multiple spark columns更新火花数据框中多个火花列的日期格式
【发布时间】:2020-06-21 18:16:15
【问题描述】:

我有一个 Spark 数据框,其中几列具有不同类型的日期格式。
为了解决这个问题,我编写了下面的代码,以保持所有日期列的格式类型一致。
由于日期列的日期格式每次都可能发生变化,因此我在dt_formats 中定义了一组日期格式。

def to_timestamp_multiple(s: Column, formats: Seq[String]): Column = {
    coalesce(formats.map(fmt => to_timestamp(s, fmt)):_*)
}

val dt_formats= Seq("dd-MMM-yyyy", "MMM-dd-yyyy", "yyyy-MM-dd","MM/dd/yy","dd-MM-yy","dd-MM-yyyy","yyyy/MM/dd","dd/MM/yyyy")

val newDF =  df.withColumn("ETD1", date_format(to_timestamp_multiple($"ETD",Seq("dd-MMM-yyyy", dt_formats)).cast("date"), "yyyy-MM-dd")).drop("ETD").withColumnRenamed("ETD1","ETD")

但是在这里我必须创建一个新列然后我必须删除旧列然后重命名新列。 这使代码变得不必要非常笨拙,因此我想从此代码中覆盖。

我正在尝试通过在函数下面编写一个 Scala 来实现类似的功能,但它抛出异常 org.apache.spark.sql.catalyst.parser.ParseException:,但我无法确定我应该进行哪些更改才能使其工作..


val CleansedData= rawDF.selectExpr(rawDF.columns.map( 
x => { x match {
  case "ETA" => s"""date_format(to_timestamp_multiple($x, dt_formats).cast("date"), "yyyy-MM-dd") as ETA"""
  case _ => x
}  }   ) : _*)

因此寻求帮助。 提前致谢。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql simpledateformat


    【解决方案1】:

    创建一个UDF 以便与select 一起使用。 The select method 获取列并生成另一个 DataFrame。

    此外,与其使用coalesce,不如简单地构建一个处理所有格式的解析器可能更直接。您可以为此使用DateTimeFormatterBuilder

    import java.time.format.DateTimeFormatter
    import java.time.format.DateTimeFormatterBuilder
    import org.apache.spark.sql.functions.udf
    import java.time.LocalDate
    import scala.util.Try
    import java.sql.Date
    
    val dtFormatStrings:Seq[String] = Seq("dd-MMM-yyyy", "MMM-dd-yyyy", "yyyy-MM-dd","MM/dd/yy","dd-MM-yy","dd-MM-yyyy","yyyy/MM/dd","dd/MM/yyyy")
    
    // use foldLeft with appendOptional method, which for each format,
    // returns a new builder with that additional possible format
    
    val initBuilder = new DateTimeFormatterBuilder()
    val builder: DateTimeFormatterBuilder = dtFormatStrings.foldLeft(initBuilder)(
      (b: DateTimeFormatterBuilder, s:String) => b.appendOptional(DateTimeFormatter.ofPattern(s)))
    val formatter = builder.toFormatter()
    
    // Create the UDF, which just takes
    // any function returning a sql-compatible type (java.sql.Date, here)
    
    def toTimeStamp2(dateString:String): Date = {
      val dateTry: Try[Date] = Try(java.sql.Date.valueOf(LocalDate.parse(dateString, formatter)))
      dateTry.toOption.getOrElse(null)
    }
    
    val timeConversionUdf = udf(toTimeStamp2 _)
    
    // example DF and new DF
    val df = Seq(("05/08/20"), ("2020-04-03"), ("unparseable")).toDF("ETD")
    df.select(timeConversionUdf(col("ETD"))).toDF("ETD2").show
    
    

    输出:

    +----------+
    |      ETD2|
    +----------+
    |2020-05-08|
    |2020-04-03|
    |      null|
    +----------+
    

    请注意,不可解析的值最终为 null,如图所示。

    【讨论】:

    • 我收到以下异常-java.io.NotSerializableException: java.time.format.DateTimeFormatterBuilder
    • 尝试将任何不可序列化的项目移动到静态类(即 Scala 对象)或在 UDF 中/在执行器上实例化它们。
    【解决方案2】:

    尝试withColumn(...) 同名并合并如下-

    val dt_formats= Seq("dd-MMM-yyyy", "MMM-dd-yyyy", "yyyy-MM-dd","MM/dd/yy","dd-MM-yy","dd-MM-yyyy","yyyy/MM/dd","dd/MM/yyyy")
    
    val newDF =  df.withColumn("ETD", coalesce(dt_formats.map(fmt => to_date($"ETD", fmt)):_*))
    

    【讨论】:

    • 但是这会再次添加新列并且必须执行删除和重命名列:(
    • 不,这不会添加新列,您尝试过吗? Doc 说 withColumn - 通过添加列或替换具有相同名称的现有列来返回新数据集。
    • 我刚刚看到withcolumn,因此没有尝试..但肯定会尝试实施。
    • 你能帮我解决这个问题吗-stackoverflow.com/questions/62521079/…
    • @BlueStar,这个解决方案对你有用吗?如果您遇到任何问题,请告诉我
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-11-06
    • 1970-01-01
    • 2018-03-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多