【发布时间】:2019-08-01 05:32:27
【问题描述】:
我正在使用 MongoDB Spark 连接器 来获取集合。目的是我们想要返回集合中存在的所有文档。我们希望将所有这些文档作为 JSON 文档数组返回。
我能够获取该集合,但我不确定如何将包含文档列表的 customRDD 对象转换为 JSON 格式。正如您在代码中看到的那样,我可以转换第一个文档,但是如何转换从集合中读取的所有文档,然后生成一条 JSON 消息并发送它。
预期输出:
这可以是文档数组。
{
"objects":[
{
...
},
{
....
}
]
}
现有代码:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import com.mongodb.spark.config._
import com.mongodb.spark._
import org.json4s.native.JsonMethods._
import org.json4s.JsonDSL.WithDouble._
var conf = new SparkConf()
conf.setAppName("MongoSparkConnectorIntro")
.setMaster("local")
.set("spark.hadoop.validateOutputSpecs", "false")
.set("spark.mongodb.input.uri","mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred")
.set("spark.mongodb.output.uri","mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred")
sc = new SparkContext(conf)
val spark = SparkSession.builder().master("spark://192.168.137.103:7077").appName("MongoSparkConnectorIntro").config("spark.mongodb.input.uri", "mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred").config("spark.mongodb.output.uri", "mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred").getOrCreate()
//val readConfig = ReadConfig(Map("collection" -> "metadata_collection", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred"))
val customRdd = MongoSpark.load(sc, readConfig)
//println("Before Printing the value" + customRdd.toString())
println("The Count: "+customRdd.count)
println("The First Document: " + customRdd.first.toString())
val resultJSOn = "MetaDataFinalResponse" -> customRdd.collect().toList
val stringResponse = customRdd.first().toJson()
println("Final Response: " +stringResponse)
return stringResponse
注意:
我不想进一步将 JSON 文档映射到另一个模型。我希望他们保持现状。我只想将它们聚合到一条 JSON 消息中。
Spark 版本: 2.4.0
SBT 文件:
name := "Test"
version := "0.1"
scalaVersion := "2.12.8"
libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.7.0"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
【问题讨论】:
标签: json scala apache-spark apache-spark-sql