【问题标题】:explode json array in schema rdd在模式rdd中爆炸json数组
【发布时间】:2015-04-27 17:25:23
【问题描述】:

我有一个像这样的json:

{"name":"Yin", "address":[{"city":"Columbus","state":"Ohio"},{"city":"Columbus","state":"Ohio"}]} 
{"name":"Michael", "address":[{"city":null, "state":"California"},{"city":null, "state":"California"}]}

这里的地址是一个数组,如果我使用sqlContext.jsonfile,我会在模式 rdd 中获取数据,如下所示:

[Yin , [(Columbus , Ohio) , (Columbus , Ohio)] 
[Micheal , [(null, California) , (null, California)] 

我想分解存在的数组,并希望模式 rdd 中的数据格式如下:

[Yin, Columbus, Ohio] 
[Yin, Columbus, Ohio] 
[Micheal, null, California] 
[Micheal, null, California]

我正在使用 Spark SQL

【问题讨论】:

    标签: json scala apache-spark-sql


    【解决方案1】:

    典型的建议是为此退出 sql,但如果您想留在 SQL 中,这是我在邮件列表中询问此问题时得到的答案(由于某种原因,nabble 未显示响应):

    来自迈克尔·阿姆布鲁斯特

    您可以使用横向视图爆炸(使用HiveContext),但似乎缺少的是 jsonRDD 将 json 对象转换为结构(具有固定顺序的固定键)并且使用 @ 访问结构中的字段987654322@

    val myJson = sqlContext.jsonRDD(sc.parallelize("""{"foo":[{"bar":1},{"baz":2}]}""" :: Nil))
    myJson.registerTempTable("JsonTest")​
    val result = sql("SELECT f.bar FROM JsonTest LATERAL VIEW explode(foo) a AS f").collect()
    
    myJson: org.apache.spark.sql.DataFrame = [foo: array<struct<bar:bigint,baz:bigint>>]
    result: Array[org.apache.spark.sql.Row] = Array([1], [null])
    

    在 Spark 1.3 中,您还可以通过手动指定 JSON 的架构来向 jsonRDD 提示您希望将 json 对象转换为 Maps(非统一键)而不是结构。

    import org.apache.spark.sql.types._
    val schema =
      StructType(
        StructField("foo", ArrayType(MapType(StringType, IntegerType))) :: Nil)
    ​
    sqlContext.jsonRDD(sc.parallelize("""{"foo":[{"bar":1},{"baz":2}]}""" :: Nil), schema).registerTempTable("jsonTest")
    ​
    val withSql = sql("SELECT a FROM jsonTest LATERAL VIEW explode(foo) a AS a WHERE a['bar'] IS NOT NULL").collect()
    ​
    val withSpark = sql("SELECT a FROM jsonTest LATERAL VIEW explode(foo) a AS a").rdd.filter  {
      case Row(a: Map[String, Int]) if a.contains("bar") => true
      case _: Row => false
    }.collect()
    schema: org.apache.spark.sql.types.StructType = StructType(StructField(foo,ArrayType(MapType(StringType,IntegerType,true),true),true))
    withSql: Array[org.apache.spark.sql.Row] = Array([Map(bar -> 1)])
    withSpark: Array[org.apache.spark.sql.Row] = Array([Map(bar -> 1)])
    

    【讨论】:

    • 嘿贾斯汀,我尝试了你的解决方案,它给了我以下错误:线程“main”中的异常 java.lang.RuntimeException:[1.36] 失败:预期为“UNION”,但发现标识符 VIEW SELECT f .bar FROM JsonTest LATERAL VIEW explode(foo) a AS f ^
    • P.S.,我使用的是 spark 1.2 版,而您使用的是 spark 1.3 中引入的 DataFrame
    • 您需要使用 HiveContext 才能使用 LATERAL VIEW
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-02-18
    • 2017-08-08
    • 2019-07-26
    • 1970-01-01
    • 1970-01-01
    • 2016-11-09
    • 1970-01-01
    相关资源
    最近更新 更多