【发布时间】:2018-08-13 05:23:57
【问题描述】:
我正在尝试将 json 文件解析为 csv 文件。
结构有点复杂,我在scala中写了一个spark程序来完成这个任务。 就像文档每行不包含一个 json 对象一样,我决定按照我发现的一些答案和帖子中的建议使用 wholeTextFiles 方法。
val jsonRDD = spark.sparkContext.wholeTextFiles(fileInPath).map(x => x._2)
然后我读取数据框中的 json 内容
val dwdJson = spark.read.json(jsonRDD)
然后我想导航 json 并展平数据。 这是来自 dwdJson 的架构
root
|-- meta: struct (nullable = true)
| |-- dimensions: struct (nullable = true)
| | |-- lat: long (nullable = true)
| | |-- lon: long (nullable = true)
| |-- directory: string (nullable = true)
| |-- filename: string (nullable = true)
|-- records: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- grids: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- gPt: array (nullable = true)
| | | | | |-- element: double (containsNull = true)
| | |-- time: string (nullable = true)
这是我最好的方法:
val dwdJson_e1 = dwdJson.select($"meta.filename", explode($"records").as("records_flat"))
val dwdJson_e2 = dwdJson_e1.select($"filename", $"records_flat.time",explode($"records_flat.grids").as("gPt"))
val dwdJson_e3 = dwdJson_e2.select($"filename", $"time", $"gPt.gPt")
val dwdJson_flat = dwdJson_e3.select($"filename"
,$"time"
,$"gPt".getItem(0).as("lat1")
,$"gPt".getItem(1).as("long1")
,$"gPt".getItem(2).as("lat2")
,$"gPt".getItem(3).as("long2")
,$"gPt".getItem(4).as("value"))
我是 Scala 新手,我想知道是否可以避免创建似乎效率低下且程序运行非常缓慢的中间数据帧(dwdJson_e1、dwdJson_e2、dwdJson_e3)(与在笔记本电脑上运行的 java 解析器相比)。
另一方面,我找不到解绑这些嵌套数组的方法。
spark 版本: 2.0.0 斯卡拉: 2.11.8 java: 1.8
**
编辑 1:示例 Json 文件和 csv 输出
**
这是我要转换的示例 Json 文件:
{
"meta" : {
"directory" : "weather/cosmo/de/grib/12/aswdir_s",
"filename" : "COSMODE_single_level_elements_ASWDIR_S_2018022312_000.grib2.bz2",
"dimensions" : {
"lon" : 589,
"time" : 3,
"lat" : 441
}
},
"records" : [ {
"grids" : [ {
"gPt" : [ 45.175, 13.55, 45.2, 13.575, 3.366295E-7 ]
}, {
"gPt" : [ 45.175, 13.575, 45.2, 13.6, 3.366295E-7 ]
}, {
"gPt" : [ 45.175, 13.6, 45.2, 13.625, 3.366295E-7 ]
} ],
"time" : "2018-02-23T12:15:00Z"
}, {
"grids" : [ {
"gPt" : [ 45.175, 13.55, 45.2, 13.575, 4.545918E-7 ]
}, {
"gPt" : [ 45.175, 13.575, 45.2, 13.6, 4.545918E-7 ]
}, {
"gPt" : [ 45.175, 13.6, 45.2, 13.625, 4.545918E-7 ]
}
],
"time" : "2018-02-23T12:30:00Z"
}
]
}
这是上面 json 的示例输出:
filename, time, lat1, long1, lat2, long2, value
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.55, 45.2, 13.575,3.366295E-7
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.575, 45.2, 13.6,3.366295E-7
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.6, 45.2, 13.625,3.366295E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175, 13.55, 45.2,13.575,4.545918E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.575,45.2,13.6,4.545918E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.6,45.2,13.625,4.545918E-7
任何帮助将不胜感激。 亲切的问候,
【问题讨论】:
-
大概可以使用
explode函数,比如here -
您好,感谢您的评论。实际上这是我的问题,我使用的是嵌套数组爆炸。我希望有一个表达式来导航到最深层次,而无需创建中间 DataFrame。
-
你期望的输出模式是什么?
-
嗨@nabongs。我编辑了我的问题并添加了示例输入和输出。希望能帮助你帮助我;-)
标签: json scala csv apache-spark spark-dataframe