【问题标题】:fast way to process json file in Spark在 Spark 中处理 json 文件的快速方法
【发布时间】:2016-08-12 02:54:48
【问题描述】:

我有一组带有嵌套键值对的大型压缩 json 文件。 json 对象中有大约 70-80 个键(和子键),但是,我只对几个键感兴趣。我想用 Spark SQL 查询 json 文件,只挑出我感兴趣的键值对,并将它们输出到一组 csv 文件中。处理一个大小为 170MB 的压缩 json 文件大约需要 5 分钟。我只是想知道是否有任何方法可以优化这个过程。或者有没有比 Spark 更好的工具来完成这种工作?谢谢!

这是我使用的 scala 代码的快照:

val data = sc.textFile("abcdefg.txt.gz")
// repartition the data
val distdata = data.repartition(10)
val dataDF = sqlContext.read.json(distdata)
// register a temp table
dataDF.registerTempTable("pixels")

// query the json file, grab columns of interest
val query =
"""
  |SELECT col1, col2, col3, col4, col5
  |FROM pixels
  |WHERE col1 IN (col1_v1, col1_v2, ...)
""".stripMargin
val result = sqlContext.sql(query)

// reformat the timestamps
val result2 = result.map(
  row => {
    val timestamp = row.getAs[String](0).stripSuffix("Z").replace("T"," ")
    Row(timestamp, row(1), row(2), row(3), row(4), row(5), row(6), row(7),
      row(8), row(9), row(10), row(11))
  }
)
// output the result to a csv and remove the square bracket in each row
val output_file = "/root/target"
result2.map(row => row.mkString(",")).saveAsTextFile(output_file)

【问题讨论】:

  • 我猜大部分时间都在进行读取/解压缩和写入,这无法并行化。加上分配作业和收集结果的开销,我猜使用 Spark 会减慢你的速度。为什么repartition 的未解析行?
  • 如果您只想转换数据。您不需要所有 SparkSQL 功能。只要坚持RDD。使用像 PlayJson 这样的快速 json 库来解析 json。修改它并转储它。
  • 除非明确要求,否则请不要对 RDD 进行重新分区。

标签: json scala apache-spark apache-spark-sql etl


【解决方案1】:

假设您的 json 数据如下所示,

{ "c1": "timestamp_1", "c2": "12", "c3": "13", "c": "14", "c5": "15", ... }
{ "c1": "timestamp_1", "c2": "22", "c3": "23", "c": "24", "c5": "25", ... }
{ "c1": "timestamp_1", "c2": "32", "c3": "33", "c": "34", "c5": "35", ... }

现在,您可以使用 json 库和 RDD 来进行转换转储。

import play.api.libs.json._

val data = sc.textFile("abcdefg.txt.gz")

val jsonData = data.map(line => Json.parse(line))

// filter the rdd and just keep the values of interest
val filteredData = data
  .filter(json => {
    val c1 = (json \ "c1").as[String]
    List[String]("c1_val1", "c2_val2", ...).contains(c1)
  })

  // reformat the timestamps and transform to tuple
val result2 = filteredData
  .map(json => {
    val ts = (json \ "c1").as[String]
    val tsFormated =  ts.stripSuffix("Z").replace("T"," ")
    (tsFormated, (json \ "c2").as[String], ...)
  })

val output_file = "/root/target"

result2.saveAsTextFile(output_file)

【讨论】:

    【解决方案2】:

    处理json的简单方法:

            val path = "examples/src/main/resources/people.json"
            val peopleDF = spark.read.json(path)
    
            peopleDF.printSchema()
    
            peopleDF.createOrReplaceTempView("people")
    
            val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show()
    
            val otherPeopleRDD = spark.sparkContext.makeRDD(   """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleRDD) otherPeople.show()
    

    见文档:http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets

    【讨论】:

    • 这个答案有帮助吗?
    猜你喜欢
    • 1970-01-01
    • 2023-03-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多