【发布时间】:2020-12-10 09:39:22
【问题描述】:
我有一个定义为 json 数据的架构
val gpsSchema: StructType =
StructType(Array(
StructField("Name",StringType,true),
StructField("GPS", ArrayType(
StructType(Array(
StructField("TimeStamp",DoubleType,true),
StructField("Longitude", DoubleType, true),
StructField("Latitude",DoubleType,true)
)),true),true)))
数据
{"Name":"John","GPS":[{"TimeStamp": 1605449171.259277, "Longitude": -76.463684, "Latitude": 40.787052},
{"TimeStamp": 1605449175.743052, "Longitude": -76.464046, "Latitude": 40.787038},
{"TimeStamp": 1605449180.932659, "Longitude": -76.464465, "Latitude": 40.787022},
{"TimeStamp": 1605449187.288478, "Longitude": -76.464977, "Latitude": 40.787054}]}
如何将新的 StructField "ID" (uid) 添加到 GPS 数组中,以便
之前
[{"TimeStamp": 1605449171.259277, "Longitude": -76.463684, "Latitude": 40.787052},
{"TimeStamp": 1605449175.743052, "Longitude": -76.464046, "Latitude": 40.787038},
{"TimeStamp": 1605449180.932659, "Longitude": -76.464465, "Latitude": 40.787022},
{"TimeStamp": 1605449187.288478, "Longitude": -76.464977, "Latitude": 40.787054}]
之后
[{"ID": 123,"TimeStamp": 1605449171.259277, "Longitude": -76.463684, "Latitude": 40.787052},
{"ID": 123, "TimeStamp": 1605449175.743052, "Longitude": -76.464046, "Latitude": 40.787038},
{"ID": 123,"TimeStamp": 1605449180.932659, "Longitude": -76.464465, "Latitude": 40.787022},
{"ID": 123,"TimeStamp": 1605449187.288478, "Longitude": -76.464977, "Latitude": 40.787054}]
一种方法是展平嵌套字段,添加新列“ID”,使用 struct("ID","TimeStamp","Longitude","Latitude") 并执行 collect_list 如下:-
Dataframe
.withColumn( "ID", uuid())
.withColumn("GPS", explode($"GPS"))
.select($"ID", $"Name", $"GPS.*")
.select($"Name" ,struct("ID","TimeStamp","Longitude","Latitude").alias("field"))
.groupBy("Name").agg(collect_list($"field"))
如果数组中有大量元素可能导致火花驱动器崩溃,这将是一项昂贵的操作
还有其他方法可以在现有架构的 GPS 数组中添加“ID”字段吗?
【问题讨论】:
-
火花版??
-
Spark 2.4.5、Scala 2.11
标签: json scala apache-spark struct schema