【问题标题】:Spark 2.0.1: split JSON Array Column into ArrayType(StringType)Spark 2.0.1:将 JSON 数组列拆分为 ArrayType(StringType)
【发布时间】:2016-12-20 15:46:04
【问题描述】:

我有一个这样的数据框

root
 |-- sum_id: long (nullable = true)
 |-- json: string (nullable = true)

+-------+------------------------------+
|sum_id |json                          |
+-------+------------------------------+
|8124455|[{"itemId":11},{"itemId":12}] |
|8124457|[{"itemId":53}]               |
|8124458|[{"itemId":11},{"itemId":33}] |
+-------+------------------------------+

我想用 Scala 来实现这一点

root
 |-- sum_id: long (nullable = true)
 |-- itemId: int(nullable = true)

+-------+--------+
|sum_id |itemId  |
+-------+--------+
|8124455|11      |
|8124455|12      |
|8124457|53      |
|8124458|11      |
|8124458|33      |
+-------+--------+

我尝试了什么:

  1. 使用get_json_object,但是列是JSON对象数组,所以我认为应该先分解成对象,但是怎么做呢?

  2. 尝试将列 jsonStringType 转换为 ArrayType(StringType),但出现 data type mismatch 异常。

请指导我如何解决这个问题。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    以下代码将准确地完成您的工作。

    val toItemArr = udf((jsonArrStr:String) => {
          jsonArrStr.replace("[","").replace("]","").split(",")
       })
    
    inputDataFrame.withColumn("itemId",explode(toItemArr(get_json_object(col("json"),"$[*].itemId")))).drop("json").show
    
    
    +-------+------+
    |     id|itemId|
    +-------+------+ 
    |8124455|    11|
    |8124455|    12|
    |8124457|    53|
    |8124458|    11|
    |8124458|    33|
    +-------+------+
    

    【讨论】:

    • 再问一个问题,@SanthoshPrasad,如果我的 json 元素有多个属性(例如,bought_time)并且我也想得到它。我该怎么做?
    • 您可以使用相同的函数将信息放入另一列,(例如 get_json_object(col("json"),"$[*].built_time")) 通过提供属性 name ,或者如果您愿意,您可以获取两个属性值并将它们连接/合并到单个列中。
    【解决方案2】:

    因为您使用的是 Json ,所以这可能是最好的方法:

    请看一下:

    import org.apache.spark._
    import com.fasterxml.jackson.module.scala.DefaultScalaModule
    import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
    import com.fasterxml.jackson.databind.ObjectMapper
    import com.fasterxml.jackson.databind.DeserializationFeature
    
    val df = sc.parallelize(Seq((8124455,"""[{"itemId":11},{"itemId":12}]"""),(8124457,"""[{"itemId":53}]"""),(8124458,"""[{"itemId":11},{"itemId":33}]"""))).toDF("sum_id","json")
    val result = df.rdd.mapPartitions(records => {
            val mapper = new ObjectMapper with ScalaObjectMapper
            mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
            mapper.registerModule(DefaultScalaModule)
          val values=records.flatMap(record => {
              try {
                Some((record.getInt(0),mapper.readValue(record.getString(1), classOf[List[Map[String,Int]]]).map(_.map(_._2).toList).flatten))
              } catch {
                case e: Exception => None
              }
            })
    values.flatMap(listOfList=>listOfList._2.map(a=>(listOfList._1,a)))
        }, true)
    
    result.toDF.show()
    

    【讨论】:

      猜你喜欢
      • 2017-02-20
      • 1970-01-01
      • 2019-12-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-12-06
      • 1970-01-01
      • 2020-04-24
      相关资源
      最近更新 更多