【问题标题】:Extract a specific JSON structure from a json string in a Spark Rdd - Scala从 Spark Rdd 中的 json 字符串中提取特定的 JSON 结构 - Scala
【发布时间】:2017-10-19 15:34:03
【问题描述】:

我有一个 json 字符串,例如:

{"sequence":89,"id":8697344444103393,"trackingInfo":{"location":"Browse","row":0,"trackId":14170286,"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","videoId":80000778,"rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171"},"type":["Play","Action","Session"],"time":527636408955},1],
{"sequence":155,"id":8697389381205360,"trackingInfo":{"location":"Browse","row":0,"trackId":14170286,"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","videoId":80000778,"rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171"},"type":["Play","Action","Session"],"time":527637858607},1],
{"sequence":136,"id":8697374208897843,"trackingInfo":{"location":"Browse","row":0,"trackId":14170286,"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","videoId":80000778,"rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171"},"type":["Play","Action","Session"],"time":527637405129},1],
{"sequence":189,"id":8697413135394406,"trackingInfo":{"row":0,"trackId":14272744,"requestId":"284929d9-6147-4924-a19f-4a308730354c-3348447","rank":0,"videoId":80075830,"location":"PostPlay\/Next"},"type":["Play","Action","Session"],"time":527638558756},1],
{"sequence":130,"id":8697373887446384,"trackingInfo":{"location":"Browse","row":0,"trackId":14170286,"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","videoId":80000778,"rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171"},"type":["Play","Action","Session"],"time":527637394083}]

这里最好的方法是什么?累了

val rdd = sc.parallelize(Seq(jsonString)).flatMap(_.split("}"))
val trackingRdd = rdd.filter(_.contains("trackingInfo"))

此尝试的示例输出是:

,{"sequence":89,"id":8697344444103393,"trackingInfo":{"location":"Browse","row":0,"trackId":14170286,"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","videoId":80000778,"rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171"

如您所见,我几乎拥有我想要的所有数据,除了 "type":["Play","Action","Session"],"time":527636408955},1],因为我在 }

上拆分

感谢任何帮助

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-dataframe scala-collections


    【解决方案1】:

    我们可以读取JSON结构的数据,例如:

    scala> val df=spark.read.json(sc.parallelize(Seq(jsonString))).select(explode(col("reverseDeltas"))).select(explode(col("col"))).map(_.getString(0)).filter(_.indexOf('{')>=0)
    warning: there was one deprecation warning; re-run with -deprecation for details
    df: org.apache.spark.sql.Dataset[String] = [value: string]
    
    scala> spark.read.json(df).filter(col("trackingInfo").isNotNull).select("trackingInfo").toJSON.show(false)
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |value                                                                                                                                                                                                                            |
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |{"trackingInfo":{"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","location":"Browse","rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171","row":0,"trackId":14170286,"videoId":80000778}}|
    |{"trackingInfo":{"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","location":"Browse","rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171","row":0,"trackId":14170286,"videoId":80000778}}|
    |{"trackingInfo":{"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","location":"Browse","rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171","row":0,"trackId":14170286,"videoId":80000778}}|
    |{"trackingInfo":{"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","location":"Browse","rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171","row":0,"trackId":14170286,"videoId":80000778}}|
    |{"trackingInfo":{"location":"PostPlay/Next","rank":0,"requestId":"284929d9-6147-4924-a19f-4a308730354c-3348447","row":0,"trackId":14272744,"videoId":80075830}}                                                                  |
    |{"trackingInfo":{"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","location":"Browse","rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171","row":0,"trackId":14170286,"videoId":80000778}}|
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    
    
    scala> 
    

    【讨论】:

    • 感谢您的回答。是否可以获取具有 trackingInfo 的记录周围的外部值?所以我需要sequenceidtype(只是第一个元素)和time
    • 实际上,我可以在完成第一行并保持它为 rdd 之后,通过一个简单的过滤器来获得这些。并且正在做df.filter(_.contains("trackingInfo"))。示例结果记录如下所示; {"sequence":89,"id":8697344444103393,"trackingInfo":{"location":"Browse","row":0,"trackId":14170286,"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","videoId":80000778,"rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171"},"type":["Play","Action","Session"],"time":527636408955}
    • 您介意解释一下您的代码的第一行吗? sqlContext.read.json(sc.parallelize(Seq(jsonString))) .select(explode(col("reverseDeltas"))) .select(explode(col("col"))) .map(_.getString(0)) .filter(_.indexOf('{') >= 0)
    • JSON 数据加载为DataSet 后,我​​们可以使用Spark SQL 使用一些SQL 函数对其进行查询。仅供参考:spark.apache.org/docs/latest/api/java/org/apache/spark/sql/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-07-27
    • 2014-10-04
    • 1970-01-01
    • 2017-05-04
    • 1970-01-01
    • 1970-01-01
    • 2019-03-16
    相关资源
    最近更新 更多