【问题标题】:How to Convert Spark RDD into JSON using Scala Language如何使用 Scala 语言将 Spark RDD 转换为 JSON
【发布时间】:2019-08-01 05:32:27
【问题描述】:

我正在使用 MongoDB Spark 连接器 来获取集合。目的是我们想要返回集合中存在的所有文档。我们希望将所有这些文档作为 JSON 文档数组返回。

我能够获取该集合,但我不确定如何将包含文档列表的 customRDD 对象转换为 JSON 格式。正如您在代码中看到的那样,我可以转换第一个文档,但是如何转换从集合中读取的所有文档,然后生成一条 JSON 消息并发送它。

预期输出:

这可以是文档数组。

{
   "objects":[
      {
         ...
      },
      {
         ....
      }
   ]
} 

现有代码:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import com.mongodb.spark.config._
import com.mongodb.spark._
import org.json4s.native.JsonMethods._
import org.json4s.JsonDSL.WithDouble._

 var conf = new SparkConf()
    conf.setAppName("MongoSparkConnectorIntro")
      .setMaster("local")
      .set("spark.hadoop.validateOutputSpecs", "false")
      .set("spark.mongodb.input.uri","mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred")
      .set("spark.mongodb.output.uri","mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred")

    sc = new SparkContext(conf)
    val spark = SparkSession.builder().master("spark://192.168.137.103:7077").appName("MongoSparkConnectorIntro").config("spark.mongodb.input.uri", "mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred").config("spark.mongodb.output.uri", "mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred").getOrCreate()


    //val readConfig = ReadConfig(Map("collection" -> "metadata_collection", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
    val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred"))
    val customRdd = MongoSpark.load(sc, readConfig)

    //println("Before Printing the value" + customRdd.toString())
    println("The Count: "+customRdd.count)
    println("The First Document: " + customRdd.first.toString())

    val resultJSOn = "MetaDataFinalResponse" -> customRdd.collect().toList

    val stringResponse = customRdd.first().toJson()
    println("Final Response: " +stringResponse)
    return stringResponse

注意:

我不想进一步将 JSON 文档映射到另一个模型。我希望他们保持现状。我只想将它们聚合到一条 JSON 消息中。

Spark 版本: 2.4.0

SBT 文件:

name := "Test"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.7.0"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"

【问题讨论】:

    标签: json scala apache-spark apache-spark-sql


    【解决方案1】:

    这个答案生成没有转义字符的json字符串,效率更高,但是你需要收集RDD来执行这个(你可以从我之前的答案中删除代码);

    // We will create a new Document with the documents that are fetched from MongoDB
    import scala.collection.JavaConverters._
    import org.bson.Document
    // Collect customRdd and convert to java array 
    // (we can only create new Document with java collections)
    val documents = customRdd.collect().toSeq.asJava
    // Create new document with the field name you want
    val stringResponse = new Document().append("objects", documents).toJson()
    

    【讨论】:

    • 感谢您发布答案。我尝试了这个并且它有效,但是我在消息中使用每个键值“\”获取转义字符。我认为这是因为序列化。你能告诉我如何避免这种情况吗?因为我不想在最后的消息中出现转义字符。
    • 喜欢 ({ \"date\" : \"27-04-2019\", \"sourceAddress\" : \"65.32.25.10\"...) 这就是我得到的JSON。如何删除转义字符
    • 嗨@OldWolfs 感谢您再次更新答案。你正在创建新文档的最后一行,scala 无法识别“文档”我是否需要在项目中包含任何其他库。
    • 嗨@omerkhalid 我已经使用maven 创建了你的项目,它可能有一些依赖差异。文档类来自 mongo-java-driver 包,因此您可以将以下依赖项添加到 sbt 并尝试 "org.mongodb" % "mongodb-java-driver" % "3.9.0"
    • 兄弟你确定没有其他依赖,我添加了你提到的但它仍然没有加载文档包。
    猜你喜欢
    • 1970-01-01
    • 2018-03-05
    • 2017-06-13
    • 1970-01-01
    • 1970-01-01
    • 2016-04-13
    • 2015-02-27
    • 1970-01-01
    • 2017-12-29
    相关资源
    最近更新 更多