【发布时间】:2016-08-12 02:54:48
【问题描述】:
我有一组带有嵌套键值对的大型压缩 json 文件。 json 对象中有大约 70-80 个键(和子键),但是,我只对几个键感兴趣。我想用 Spark SQL 查询 json 文件,只挑出我感兴趣的键值对,并将它们输出到一组 csv 文件中。处理一个大小为 170MB 的压缩 json 文件大约需要 5 分钟。我只是想知道是否有任何方法可以优化这个过程。或者有没有比 Spark 更好的工具来完成这种工作?谢谢!
这是我使用的 scala 代码的快照:
val data = sc.textFile("abcdefg.txt.gz")
// repartition the data
val distdata = data.repartition(10)
val dataDF = sqlContext.read.json(distdata)
// register a temp table
dataDF.registerTempTable("pixels")
// query the json file, grab columns of interest
val query =
"""
|SELECT col1, col2, col3, col4, col5
|FROM pixels
|WHERE col1 IN (col1_v1, col1_v2, ...)
""".stripMargin
val result = sqlContext.sql(query)
// reformat the timestamps
val result2 = result.map(
row => {
val timestamp = row.getAs[String](0).stripSuffix("Z").replace("T"," ")
Row(timestamp, row(1), row(2), row(3), row(4), row(5), row(6), row(7),
row(8), row(9), row(10), row(11))
}
)
// output the result to a csv and remove the square bracket in each row
val output_file = "/root/target"
result2.map(row => row.mkString(",")).saveAsTextFile(output_file)
【问题讨论】:
-
我猜大部分时间都在进行读取/解压缩和写入,这无法并行化。加上分配作业和收集结果的开销,我猜使用 Spark 会减慢你的速度。为什么
repartition的未解析行? -
如果您只想转换数据。您不需要所有 SparkSQL 功能。只要坚持RDD。使用像 PlayJson 这样的快速 json 库来解析 json。修改它并转储它。
-
除非明确要求,否则请不要对 RDD 进行重新分区。
标签: json scala apache-spark apache-spark-sql etl