【问题标题】:Explode Deeply Nested JSON returning duplicates in Spark Scala在 Spark Scala 中爆炸深度嵌套的 JSON 返回重复项
【发布时间】:2019-07-04 09:45:14
【问题描述】:

我有一个实用程序,可以很好地解析简单的 JSON,但如果 JSON 中存在多个数组 [structs] 则交叉连接

我也尝试过 distinct() 或 dropDuplicates() 来删除由于我在代码中包含的交叉连接而发生的重复项,但那会返回空 DF..

def flattenDataFrame(df: DataFrame): DataFrame = {

var flattenedDf: DataFrame = df
if (isNested(df)) {
  val flattenedSchema: Array[(Column, Boolean)] = flattenSchema(df.schema)
  var simpleColumns: List[Column] = List.empty[Column]
  var complexColumns: List[Column] = List.empty[Column]

  flattenedSchema.foreach {
    case (col, isComplex) => {
      if (isComplex) {
        complexColumns = complexColumns :+ col
      } else {
        simpleColumns = simpleColumns :+ col
      }
    }
  }

  var crossJoinedDataFrame = df.select(simpleColumns: _*)
  complexColumns.foreach(col => {
    crossJoinedDataFrame = crossJoinedDataFrame.crossJoin(df.select(col))
    crossJoinedDataFrame = flattenDataFrame(crossJoinedDataFrame)
  })
  crossJoinedDataFrame
} else {
  flattenedDf
}
  }

private def flattenSchema(schema: StructType, prefix: String = null): Array[(Column, Boolean)] = {

schema.fields.flatMap(field => {

  val columnName = if (prefix == null) field.name else prefix + "." + field.name
  field.dataType match {
    case arrayType: ArrayType => {
      val cols: Array[(Column, Boolean)] = Array[(Column, Boolean)](((explode_outer(col(columnName)).as(columnName.replace(".", "_"))), true))
      cols
      }
    case structType: StructType => {
      flattenSchema(structType, columnName)
    }
    case _ => {
      val columnNameWithUnderscores = columnName.replace(".", "_")
      val metadata = new MetadataBuilder().putString("encoding", "ZSTD").build()
      Array(((col(columnName).as(columnNameWithUnderscores, metadata)), false))
    }
  }
}).filter(field => field != None)
}

def isNested(df: DataFrame): Boolean = {
df.schema.fields.flatMap(field => {
  field.dataType match {
    case arrayType: ArrayType => {
      Array(true)
    }
    case mapType: MapType => {
      Array(true)
    }
    case structType: StructType => {
      Array(true)
    }
    case _ => {
      Array(false)
    }
  }
}).exists(b => b)
}

我遇到问题的 JSON 示例:

[
    {
        "id": "0001",
        "type": "donut",
        "name": "Cake",
        "ppu": 0.55,
        "batters":
            {
                "batter":
                    [
                        { "id": "1001", "type": "Regular" },
                        { "id": "1002", "type": "Chocolate" },
                        { "id": "1003", "type": "Blueberry" },
                        { "id": "1004", "type": "Devil's Food" }
                    ]
            },
        "topping":
            [
                { "id": "5001", "type": "None" },
                { "id": "5002", "type": "Glazed" },
                { "id": "5005", "type": "Sugar" },
                { "id": "5007", "type": "Powdered Sugar" },
                { "id": "5006", "type": "Chocolate with Sprinkles" },
                { "id": "5003", "type": "Chocolate" },
                { "id": "5004", "type": "Maple" }
            ]
    },
    {
        "id": "0002",
        "type": "donut",
        "name": "Raised",
        "ppu": 0.55,
        "batters":
            {
                "batter":
                    [
                        { "id": "1001", "type": "Regular" }
                    ]
            },
        "topping":
            [
                { "id": "5001", "type": "None" },
                { "id": "5002", "type": "Glazed" },
                { "id": "5005", "type": "Sugar" },
                { "id": "5003", "type": "Chocolate" },
                { "id": "5004", "type": "Maple" }
            ]
    }
]

【问题讨论】:

    标签: json scala apache-spark explode


    【解决方案1】:

    没有连接的解决方案,除此之外,没有交叉连接,这是你的问题:

    很抱歉格式化,无法真正将其格式化为堆栈溢出

    def flattenDataFrame(df: DataFrame): DataFrame = {
    val flattenedDf: DataFrame = df
    
    if (isNested(df)) {
      val flattenedSchema: Array[(Column, Boolean)] = flattenSchema(flattenedDf.schema)
    
      var simpleColumns: List[Column] = List.empty[Column]
      var complexColumns: List[Column] = List.empty[Column]
    
      flattenedSchema.foreach {
        case (col, isComplex) =>
          if (isComplex) {
            complexColumns = complexColumns :+ col
          } else {
            simpleColumns = simpleColumns :+ col
          }
      }
    
      val complexUnderlyingCols = complexColumns.map { column =>
        val name = column.expr.asInstanceOf[UnresolvedAttribute].name
        val unquotedColName = s"${name.replaceAll("`","")}"
        val explodeSelectColName = s"`${name.replaceAll("`","")}`"
        (unquotedColName, col(name).as(unquotedColName), explode_outer(col(explodeSelectColName)).as(unquotedColName))
      }
    
      var joinDataFrame = flattenedDf.select(simpleColumns ++ complexUnderlyingCols.map(_._2): _*)
    
      complexUnderlyingCols.foreach { case (name, tempCol, column) =>
        val nonTransformedColumns = joinDataFrame.schema.fieldNames.diff(List(name)).map(fieldName => s"`${fieldName.replaceAll("`", "")}`").map(col)
        joinDataFrame = joinDataFrame.select(nonTransformedColumns :+ column :_*)
      }
      flattenDataFrame(joinDataFrame)
    } else {
      flattenedDf
    }
    

    }

    private def flattenSchema(schema: StructType, prefix: String = null, level: Int = 0): Array[(Column, Boolean)] = { val unquotedPrefix = if (prefix != null) prefix.replace("", "") else null println(level) schema.fields.flatMap(field => { val fieldName = field.name val columnName = if (level == 0) { s"$fieldName" } else { val fullName = s"$unquotedPrefix.$fieldName" val x = fullName.split('.').reverse.zipWithIndex.reverse.foldLeft(new StringBuilder("")){ case (builder, (fieldPart, index)) => 如果(索引>级别){ builder.append(s".$fieldPart") } else if (index == level) { builder.append(s".$fieldPart") } else { builder.append(s".$fieldPart") } } x.replace(1,2,"").toString() } val unquotedColumnName = columnName.replace("", "") field.dataType 匹配 { 案例_:ArrayType => val cols: Array[(Column, Boolean)] = Array[(Column, Boolean)]((col(columnName), true)) // 我们只传递列,因为我们将在展开 DF 时生成爆炸函数 科尔斯 案例结构类型:结构类型 => flattenSchema(structType, columnName, level + 1) 案例_ => val metadata = new MetadataBuilder().putString("encoding", "ZSTD").build() Array((col(columnName).as(unquotedColumnName, metadata), false)) } }) }

    def isNested(df: DataFrame): Boolean = { df.schema.fields.flatMap(field => {

    field.dataType match { case _: ArrayType => Array(x = true) case _: MapType => Array(x = true) case _: StructType => Array(x = true) case _ => Array(x = false) } }).exists(b => b) }

    【讨论】:

      猜你喜欢
      • 2022-07-10
      • 2016-11-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-01-16
      • 2017-08-08
      • 2017-01-09
      • 2018-06-05
      相关资源
      最近更新 更多