【问题标题】:Parsing epoch milliseconds from json with Spark 2使用 Spark 2 从 json 解析纪元毫秒
【发布时间】:2017-11-02 23:53:43
【问题描述】:

有人在 Spark 2+ 中使用 from_json 解析了毫秒时间戳吗?效果如何?

所以Spark changed TimestampType 将纪元数值解析为以秒为单位而不是 v2 中的毫秒。

我的输入是一个 hive 表,其中有一列中有一个 json 格式的字符串,我试图像这样解析:

val spark = SparkSession
  .builder
  .appName("Problematic Timestamps")
  .enableHiveSupport()
  .getOrCreate()
import spark.implicits._
val schema = StructType(
  StructField("categoryId", LongType) ::
  StructField("cleared", BooleanType) ::
  StructField("dataVersion", LongType) ::
  StructField("details", DataTypes.createArrayType(StringType)) ::
  …
  StructField("timestamp", TimestampType) ::
  StructField("version", StringType) :: Nil
)
val item_parsed =
    spark.sql("select * FROM source.jsonStrInOrc")
    .select('itemid, 'locale,
            from_json('internalitem, schema)
                as 'internalitem,
            'version, 'createdat, 'modifiedat)
val item_flattened = item_parsed
    .select('itemid, 'locale,
            $"internalitem.*",
            'version as'outer_version, 'createdat, 'modifiedat)

这可以解析一行,其列包含:

{"timestamp": 1494790299549, "cleared": false, "version": "V1", "dataVersion": 2, "categoryId": 2641, "details": [], ...}

这给了我timestamp 字段,如49338-01-08 00:39:09.0,来自值1494790299549,我宁愿读作:2017-05-14 19:31:39.549

现在我可以将时间戳的架构设置为长,然后将该值除以 1000 并强制转换为时间戳,但我会得到 2017-05-14 19:31:39.000 而不是 2017-05-14 19:31:39.549。我无法弄清楚我该怎么做:

  • 告诉from_json 解析一个毫秒 时间戳(可能通过以某种方式子类化TimestampType 以在架构中使用)
  • 在架构中使用 LongType 并将其转换为 保留毫秒的时间戳

关于 UDF 的附录

我发现尝试在选择中进行除法然后进行转换对我来说看起来并不干净,尽管这是一种完全有效的方法。我选择了一个使用 java.sql.timestamp 的 UDF,它实际上是以纪元毫秒为单位指定的。

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{explode, from_json, udf}
import org.apache.spark.sql.types.
{BooleanType, DataTypes, IntegerType, LongType,
StringType, StructField, StructType, TimestampType}

val tsmillis = udf { t: Long => new Timestamp (t) }

val spark = SparkSession
  .builder
  .appName("Problematic Timestamps")
  .enableHiveSupport()
  .getOrCreate()
import spark.implicits._
val schema = StructType(
  StructField("categoryId", LongType) ::
  StructField("cleared", BooleanType) ::
  StructField("dataVersion", LongType) ::
  StructField("details", DataTypes.createArrayType(StringType)) ::
  …
  StructField("timestamp", LongType) ::
  StructField("version", StringType) :: Nil
)
val item_parsed =
    spark.sql("select * FROM source.jsonStrInOrc")
    .select('itemid, 'locale,
            from_json('internalitem, schema)
                as 'internalitem,
            'version, 'createdat, 'modifiedat)
val item_flattened = item_parsed
    .select('itemid, 'locale,
            $"internalitem.categoryId", $"internalitem.cleared",
            $"internalitem.dataVersion", $"internalitem.details",
            tsmillis($"internalitem.timestamp"),
            $"internalitem.version",
            'version as'outer_version, 'createdat, 'modifiedat)

查看选择中的内容。 我认为值得做一个性能测试,看看使用withcolumn除法和强制转换是否比udf快。

【问题讨论】:

    标签: json parsing apache-spark


    【解决方案1】:

    现在我可以将时间戳的架构设置为长,然后将该值除以 1000

    实际上这正是您所需要的,只需保持正确的类型即可。假设您只有 Long timestamp 字段:

    val df = spark.range(0, 1).select(lit(1494790299549L).alias("timestamp"))
    // df: org.apache.spark.sql.DataFrame = [timestamp: bigint]
    

    如果除以 1000:

    val inSeconds = df.withColumn("timestamp_seconds", $"timestamp" / 1000)
    // org.apache.spark.sql.DataFrame = [timestamp: bigint, timestamp_seconds: double]
    

    您将以秒为单位获得双倍时间戳(请注意,这是 SQL,而不是 Scala 行为)。

    剩下的就是cast (Spark )

    inSeconds.select($"timestamp_seconds".cast("timestamp")).show(false)
    // +-----------------------+
    // |timestamp_seconds      |
    // +-----------------------+
    // |2017-05-14 21:31:39.549|
    // +-----------------------+
    

    或(Spark >= 3.1timestamp_seconds(或直接timestamp_millis

    import org.apache.spark.sql.functions.{expr, timestamp_seconds}
    
    inSeconds.select(timestamp_seconds($"timestamp_seconds")).show(false)
    
    // +------------------------------------+
    // |timestamp_seconds(timestamp_seconds)|
    // +------------------------------------+
    // |2017-05-14 21:31:39.549             |
    // +------------------------------------+
    
    df.select(expr("timestamp_millis(timestamp)")).show(false)
    // +---------------------------+
    // |timestamp_millis(timestamp)|
    // +---------------------------+
    // |2017-05-14 21:31:39.549    |
    // +---------------------------+
    

    【讨论】:

    • 哦,所以它不是将除法四舍五入到另一个长?太好了!
    猜你喜欢
    • 2012-02-19
    • 2020-01-11
    • 1970-01-01
    • 1970-01-01
    • 2015-10-17
    • 2019-07-24
    • 1970-01-01
    • 2015-07-07
    • 2013-11-11
    相关资源
    最近更新 更多