【发布时间】:2017-11-02 23:53:43
【问题描述】:
有人在 Spark 2+ 中使用 from_json 解析了毫秒时间戳吗?效果如何?
所以Spark changed TimestampType 将纪元数值解析为以秒为单位而不是 v2 中的毫秒。
我的输入是一个 hive 表,其中有一列中有一个 json 格式的字符串,我试图像这样解析:
val spark = SparkSession
.builder
.appName("Problematic Timestamps")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val schema = StructType(
StructField("categoryId", LongType) ::
StructField("cleared", BooleanType) ::
StructField("dataVersion", LongType) ::
StructField("details", DataTypes.createArrayType(StringType)) ::
…
StructField("timestamp", TimestampType) ::
StructField("version", StringType) :: Nil
)
val item_parsed =
spark.sql("select * FROM source.jsonStrInOrc")
.select('itemid, 'locale,
from_json('internalitem, schema)
as 'internalitem,
'version, 'createdat, 'modifiedat)
val item_flattened = item_parsed
.select('itemid, 'locale,
$"internalitem.*",
'version as'outer_version, 'createdat, 'modifiedat)
这可以解析一行,其列包含:
{"timestamp": 1494790299549, "cleared": false, "version": "V1", "dataVersion": 2, "categoryId": 2641, "details": [], ...}
这给了我timestamp 字段,如49338-01-08 00:39:09.0,来自值1494790299549,我宁愿读作:2017-05-14 19:31:39.549
现在我可以将时间戳的架构设置为长,然后将该值除以 1000 并强制转换为时间戳,但我会得到 2017-05-14 19:31:39.000 而不是 2017-05-14 19:31:39.549。我无法弄清楚我该怎么做:
- 告诉
from_json解析一个毫秒 时间戳(可能通过以某种方式子类化TimestampType 以在架构中使用) - 在架构中使用
LongType并将其转换为 保留毫秒的时间戳。
关于 UDF 的附录
我发现尝试在选择中进行除法然后进行转换对我来说看起来并不干净,尽管这是一种完全有效的方法。我选择了一个使用 java.sql.timestamp 的 UDF,它实际上是以纪元毫秒为单位指定的。
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{explode, from_json, udf}
import org.apache.spark.sql.types.
{BooleanType, DataTypes, IntegerType, LongType,
StringType, StructField, StructType, TimestampType}
val tsmillis = udf { t: Long => new Timestamp (t) }
val spark = SparkSession
.builder
.appName("Problematic Timestamps")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val schema = StructType(
StructField("categoryId", LongType) ::
StructField("cleared", BooleanType) ::
StructField("dataVersion", LongType) ::
StructField("details", DataTypes.createArrayType(StringType)) ::
…
StructField("timestamp", LongType) ::
StructField("version", StringType) :: Nil
)
val item_parsed =
spark.sql("select * FROM source.jsonStrInOrc")
.select('itemid, 'locale,
from_json('internalitem, schema)
as 'internalitem,
'version, 'createdat, 'modifiedat)
val item_flattened = item_parsed
.select('itemid, 'locale,
$"internalitem.categoryId", $"internalitem.cleared",
$"internalitem.dataVersion", $"internalitem.details",
tsmillis($"internalitem.timestamp"),
$"internalitem.version",
'version as'outer_version, 'createdat, 'modifiedat)
查看选择中的内容。
我认为值得做一个性能测试,看看使用withcolumn除法和强制转换是否比udf快。
【问题讨论】:
标签: json parsing apache-spark