【问题标题】:Flatten out nested Json Document in Spark2 with scala使用 scala 在 Spark2 中展平嵌套的 Json 文档
【发布时间】:2018-08-13 05:23:57
【问题描述】:

我正在尝试将 json 文件解析为 csv 文件。

结构有点复杂,我在scala中写了一个spark程序来完成这个任务。 就像文档每行不包含一个 json 对象一样,我决定按照我发现的一些答案和帖子中的建议使用 wholeTextFiles 方法。

val jsonRDD  = spark.sparkContext.wholeTextFiles(fileInPath).map(x => x._2)

然后我读取数据框中的 json 内容

val dwdJson = spark.read.json(jsonRDD)

然后我想导航 json 并展平数据。 这是来自 dwdJson 的架构

root
 |-- meta: struct (nullable = true)
 |    |-- dimensions: struct (nullable = true)
 |    |    |-- lat: long (nullable = true)
 |    |    |-- lon: long (nullable = true)
 |    |-- directory: string (nullable = true)
 |    |-- filename: string (nullable = true)
 |-- records: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- grids: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- gPt: array (nullable = true)
 |    |    |    |    |    |-- element: double (containsNull = true)
 |    |    |-- time: string (nullable = true)

这是我最好的方法:

val dwdJson_e1 = dwdJson.select($"meta.filename", explode($"records").as("records_flat"))
val dwdJson_e2 = dwdJson_e1.select($"filename", $"records_flat.time",explode($"records_flat.grids").as("gPt"))
val dwdJson_e3 = dwdJson_e2.select($"filename", $"time", $"gPt.gPt")
val dwdJson_flat = dwdJson_e3.select($"filename"
      ,$"time"
      ,$"gPt".getItem(0).as("lat1")
      ,$"gPt".getItem(1).as("long1")
      ,$"gPt".getItem(2).as("lat2")
      ,$"gPt".getItem(3).as("long2")
      ,$"gPt".getItem(4).as("value"))

我是 Scala 新手,我想知道是否可以避免创建似乎效率低下且程序运行非常缓慢的中间数据帧(dwdJson_e1、dwdJson_e2、dwdJson_e3)(与在笔记本电脑上运行的 java 解析器相比)。

另一方面,我找不到解绑这些嵌套数组的方法。

spark 版本: 2.0.0 斯卡拉: 2.11.8 java: 1.8

**

编辑 1:示例 Json 文件和 csv 输出

**

这是我要转换的示例 Json 文件:

{
  "meta" : {
    "directory" : "weather/cosmo/de/grib/12/aswdir_s",
    "filename" : "COSMODE_single_level_elements_ASWDIR_S_2018022312_000.grib2.bz2",
    "dimensions" : {
      "lon" : 589,
      "time" : 3,
      "lat" : 441
    }
   },
  "records" : [ {
    "grids" : [ {
      "gPt" : [ 45.175, 13.55, 45.2, 13.575, 3.366295E-7 ]
    }, {
      "gPt" : [ 45.175, 13.575, 45.2, 13.6, 3.366295E-7 ]
    }, {
      "gPt" : [ 45.175, 13.6, 45.2, 13.625, 3.366295E-7 ]
    } ],
    "time" : "2018-02-23T12:15:00Z"
  }, {
    "grids" : [ {
      "gPt" : [ 45.175, 13.55, 45.2, 13.575, 4.545918E-7 ]
    }, {
      "gPt" : [ 45.175, 13.575, 45.2, 13.6, 4.545918E-7 ]
    }, {
      "gPt" : [ 45.175, 13.6, 45.2, 13.625, 4.545918E-7 ]
    }
    ],
    "time" : "2018-02-23T12:30:00Z"
    }
    ]
}

这是上面 json 的示例输出:

filename, time, lat1, long1, lat2, long2, value
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.55, 45.2, 13.575,3.366295E-7
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.575, 45.2, 13.6,3.366295E-7
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.6, 45.2, 13.625,3.366295E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175, 13.55, 45.2,13.575,4.545918E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.575,45.2,13.6,4.545918E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.6,45.2,13.625,4.545918E-7

任何帮助将不胜感激。 亲切的问候,

【问题讨论】:

  • 大概可以使用explode函数,比如here
  • 您好,感谢您的评论。实际上这是我的问题,我使用的是嵌套数组爆炸。我希望有一个表达式来导航到最深层次,而无需创建中间 DataFrame。
  • 你期望的输出模式是什么?
  • 嗨@nabongs。我编辑了我的问题并添加了示例输入和输出。希望能帮助你帮助我;-)

标签: json scala csv apache-spark spark-dataframe


【解决方案1】:

你可以试试下面的代码。它对我来说适用于复杂的 json 文档

def flattenDataframe(df: DataFrame): DataFrame = {

val fields = df.schema.fields
val fieldNames = fields.map(x => x.name)
val length = fields.length

for(i <- 0 to fields.length-1){
  val field = fields(i)
  val fieldtype = field.dataType
  val fieldName = field.name
  fieldtype match {
    case arrayType: ArrayType =>
      val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName)
      val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName")
     // val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName.*"))
      val explodedDf = df.selectExpr(fieldNamesAndExplode:_*)
      return flattenDataframe(explodedDf)
    case structType: StructType =>
      val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname)
      val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames
      val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_"))))
     val explodedf = df.select(renamedcols:_*)
      return flattenDataframe(explodedf)
    case _ =>
  }
}
df

}

【讨论】:

    【解决方案2】:

    我认为您的方法完全正确。 关于avoid create the intermediate dataframes,您实际上可以连续编写语句而无需将其分解为中间数据帧,例如

     val df = dwdJson.select($"meta.filename", explode($"records").as("record")).
        select($"filename", $"record.time", explode($"record.grids").as("grids")).
        select($"filename", $"time", $"grids.gpt").
        select($"filename", $"time", 
                  $"gpt"(0).as("lat1"), 
                  $"gpt"(1).as("long1"), 
                  $"gpt"(2).as("lat2"),
                  $"gpt"(3).as("long2"), 
                  $"gpt"(4).as("value"))
    

    我对性能问题有所考虑。 Spark 在内部使用 Jackson lib 来解析 json,它必须通过对输入的记录进行采样来干扰模式本身(默认采样率为 1.0,即所有记录)。所以,如果你有大的输入、大的文件(wholeTextFiles 操作)和复杂的架构,它会影响 Spark 程序的性能。

    【讨论】:

    • 非常感谢您提供的有用答案。我正在将此 spark 解析器与同事使用 jackson 编写的 java 解析器进行比较。他在本地将 java 解析器作为常规 java 应用程序运行,解析 350 个文件大约需要 8 秒。对于相同的输入,我的 spark 解析器需要大约 5 分钟。我认为存在一些并行化问题,因为我在提交作业时使用“--master local [8]”与使用“--master local [1]”具有相同的性能。
    猜你喜欢
    • 1970-01-01
    • 2022-01-13
    • 1970-01-01
    • 2015-11-12
    • 1970-01-01
    • 2018-10-28
    • 2020-03-08
    • 2016-09-29
    相关资源
    最近更新 更多