【问题标题】:Parse Micro/Nano Seconds timestamp in spark-csv Dataframe reader : Inconsistent results在 spark-csv 数据帧阅读器中解析微/纳秒时间戳:结果不一致
【发布时间】:2026-01-14 02:45:01
【问题描述】:

我正在尝试读取时间戳到纳秒的 csv 文件。 文件TestTimestamp.csv的样本内容-

spark- 2.4.0,scala - 2.11.11

   /**
     * TestTimestamp.csv -
     * 101,2019-SEP-23 11.42.35.456789123 AM
     *
     */

尝试使用 timestampFormat = "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa" 读取它

val dataSchema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS", TimestampType, true)))

val data = spark.read.format("csv")
      .option("header", "false")
      .option("inferSchema", "false")
      .option("treatEmptyValuesAsNulls", "true")
      //.option("nullValue", "")
      .option("dateFormat", "yyyy-MMM-dd")
      .option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa")
      .schema(dataSchema)
      .load("C:\\TestData\\Raw\\TetraPak\\Shipments\\TestTimeStamp.csv")

    data.select('Created_TS).show

我得到的输出是完全错误的日期时间。 9 月 23 日改为 9 月 28 日

+--------------------+
|          Created_TS|
+--------------------+
|2019-09-28 18:35:...|
+--------------------+

即使我有 24 小时格式的小时数,例如 - “2019-SEP-23 16.42.35.456789123” 我尝试通过给出 timestampFormat = "yyyy-MMM-dd HH.mm.ss.SSS" 来仅使用第二个分数的前几位

类似的错误结果-

val data2 = spark.read.format("csv")
      .option("header", "false")
      .option("inferSchema", "false")
      .option("treatEmptyValuesAsNulls", "true")
      //.option("nullValue", "")
      .option("dateFormat", "yyyy-MMM-dd")
      .option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSS")
      .schema(dataSchema)
      .load("C:\\TestData\\Raw\\TetraPak\\Shipments\\TestTimeStamp.csv")

    data2.select('Created_TS).show

+--------------------+
|          Created_TS|
+--------------------+
|2019-09-28 23:35:...|
+--------------------+

在使用 csv reader创建数据帧时,有什么方法可以解析此类时间戳字符串?

【问题讨论】:

    标签: apache-spark spark-csv


    【解决方案1】:

    DataFrameReader 使用SimpleDateFormat 解析日期:

    timestampFormat(默认yyyy-MM-dd'T'HH:mm:ss.SSSXXX):设置表示时间戳格式的字符串。自定义日期格式遵循 java.text.SimpleDateFormat 中的格式。这适用于时间戳类型。

    很遗憾,SimpleDateFormat 不支持纳秒,因此最后一个点之后的日期部分将被解释为 456789123 毫秒,即大约 126 小时。这个时间被添加到您的日期中,这解释了您看到的奇怪结果。有关此主题的更多详细信息,请访问this answer

    因此必须在读取 csv 后在第二步中解析日期,例如使用使用 DateTimeFormatter 的 udf:

    val dataSchema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS_String", StringType, true)))
    
    var df = spark.read.option("header", false)
      .option("inferSchema", "false")
      .option("treatEmptyValuesAsNulls", "true")
      .schema(dataSchema)
      .csv("C:\\TestData\\Raw\\TetraPak\\Shipments\\TestTimeStamp.csv")
    
    val toDate = udf((date: String) => {
      val formatter = new DateTimeFormatterBuilder()
        .parseCaseInsensitive()
        .appendPattern("yyyy-MMM-dd hh.mm.ss.SSSSSSSSS a").toFormatter()
      Timestamp.valueOf(LocalDateTime.parse(date, formatter))
    })
    
    df = df.withColumn("Created_TS", toDate('Created_TS_String))
    

    【讨论】:

    • 谢谢,你是对的。UDF 似乎是唯一的方法。我在输入文件中有大约 280 列。我必须识别所有时间戳并将列数组映射到使用 toDate udf 的表达式,然后从 df 中选择(将所有 ts 列读取为字符串)
    【解决方案2】:

    这是 werner 关于使用 udfs 的回答启发的解决方案..-

    输入 csv -

    101,2019-SEP-23 11.42.35.456789123 AM,2019-SEP-23 11.42.35.456789123 AM,2019-SEP-23 11.42.35.456789123 AM
    

    带有 TimestampType 列的原始架构

    val orig_schema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS", TimestampType, true), StructField("Updated_TS", TimestampType, true), StructField("Modified_TS", TimestampType, true)))
    

    将所有 TimestampType 转换为 StringType

    val dataSchema = StructType(orig_schema.map(x =>
          {
            x.dataType match {
              case TimestampType => StructField(x.name, StringType, x.nullable)
              case _             => x
            }
    
          }))
    

    将字符串转换为时间戳的toDate函数

    //TODO parameterize string formats
    
        def toDate(date: String): java.sql.Timestamp = {
          val formatter = new DateTimeFormatterBuilder()
            .parseCaseInsensitive()
            .appendPattern("yyyy-MMM-dd hh.mm.ss.SSSSSSSSS a").toFormatter()
          Timestamp.valueOf(LocalDateTime.parse(date, formatter))
        }
    
    // register toDate as udf
    val to_timestamp = spark.sqlContext.udf.register("to_timestamp", toDate _)
    

    创建列表达式以从原始数据框中进行选择

    // Array of Column Name & Types
        val nameType: Array[(String, DataType)] = orig_schema.fields.map(f => (f.name, f.dataType))
    
    // Create Column Expression to select from raw Dataframe
    val selectExpr = nameType.map(f => {
          f._2 match {
            case TimestampType => expr(s"CASE WHEN ${f._1} is NULL THEN NULL ELSE to_timestamp(${f._1}) END AS ${f._1}")
            case _             => expr(s"${f._1}")
          }
        })
    

    读取为 StringType ,使用使用 udf 将字符串转换为时间戳的列选择器表达式

    val data = spark.read.format("csv")
          .option("header", "false")
          .option("inferSchema", "false")
          .option("treatEmptyValuesAsNulls", "true")
          //.option("nullValue", "")
          .option("dateFormat", "yyyy-MMM-dd")
          .option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa")
          .schema(dataSchema)
    .load("C:\\TestData\\Raw\\TetraPak\\Shipments\\TestTimestamp_new.csv").select(selectExpr: _*)
    
    data.show
    

    这是所需的输出..所以现在我不必担心列数和手动使用 udf 创建表达式

    +-----+--------------------+--------------------+--------------------+
    |   ID|          Created_TS|          Updated_TS|         Modified_TS|
    +-----+--------------------+--------------------+--------------------+
    |101.0|2019-09-23 11:42:...|2019-09-23 11:42:...|2019-09-23 11:42:...|
    +-----+--------------------+--------------------+--------------------+
    
    

    【讨论】: