【问题标题】:How to convert json into RDD[json]如何将 json 转换为 RDD[json]
【发布时间】:2018-02-28 23:18:02
【问题描述】:

我想在 spark 中编写 json 对象,但是当我尝试使用 sc.parallelize 将其转换为 RDD 时,它再次将其转换回字符串

import scala.util.parsing.json._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.lit
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

val df = Seq((2012, 8, "Batman", 9.8), 
             (2012, 9, "Batman", 10.0), 
             (2012, 8, "Hero", 8.7),
             (2012, 10, "Hero", 5.7), 
             (2012, 2, "Robot", 5.5), 
             (2011, 7, "Git", 2.0),
             (2010, 1, "Dom", 2.0),
             (2019, 3, "Sri", 2.0)).toDF("year", "month", "title", "rating")

case class Rating(year:Int, month:Int, title:String, rating:Double)


import scala.collection.JavaConversions._
val ratingList = df.as[Rating].collectAsList

import java.io._
val output = for (c <- ratingList) yield
{
      val json = ("record" ->
              ("year" -> c.year) ~
              ("month" -> c.month) ~
              ("title" -> c.title) ~
              ("rating" -> c.rating))
      compact(render(json))
}

output.foreach(println)    

在这个阶段它是一个json对象,一切都很好。但是当我将其转换为 RDD 时,spark 将其视为字符串

val outputDF = sc.parallelize(output).toDF("json")
outputDF.show()
outputDF.write.mode("overwrite").json("s3://location/")

输出是:

{"test":{"json":"{\"record\":{\"year\":2012,\"month\":8,\"title\":\"Batman\",\"rating\":9.8}}"}}

【问题讨论】:

    标签: json scala apache-spark


    【解决方案1】:

    当您调用 compact 时 - 您从呈现的 json 中创建字符串。 见:

    scala> val json = ("name" -> "joe") ~ ("age" -> 35)
    scala> compact(render(json))
    res2: String = {"name":"joe","age":35}
    

    这意味着您的output 是字符串的集合。当你并行化它时 - 你会得到 RDD[String]。

    您可能希望返回 render 函数的结果以获取 JSON 对象的集合。但是你需要检查文档。

    当然,Spark 不知道如何使用 toDF() 函数将 JSON 对象从第三方库转换为 DataFrame。也许你可以这样做:

    val anotherPeopleRDD = sc.parallelize(
      """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
    val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
    

    所以基本上有 RDD[String] 然后将其读取为 JSON。

    顺便说一句

    你为什么要先这样做:

    val ratingList = df.as[Rating].collectAsList
    val output = for (c <- ratingList) yield
    {
          val json = ("record" ->
                  ("year" -> c.year) ~
                  ("month" -> c.month) ~
                  ("title" -> c.title) ~
                  ("rating" -> c.rating))
          compact(render(json))
    }
    

    然后:

    val outputDF = sc.parallelize(output).toDF("json")
    

    为什么不像这样处理集群中的所有数据:

    df.as[Rating].map{c =>
      val json = ("record" ->
        ("year" -> c.year) ~
          ("month" -> c.month) ~
          ("title" -> c.title) ~
          ("rating" -> c.rating))
      compact(render(json))
    }
    

    这样会更有效率。

    【讨论】:

    • 感谢 Vladislav,我遇到了问题,现在我可以并行化我的输出,但不能使用 toDF 使用 spark 将数据写入文件
    • 我已经扩展了我的答案。率和接受表示赞赏,顺便说一句:)
    • 非常感谢,你让我很开心。
    猜你喜欢
    • 2018-03-30
    • 2015-08-25
    • 2019-08-26
    • 1970-01-01
    • 1970-01-01
    • 2018-05-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多