【发布时间】:2017-10-31 22:38:36
【问题描述】:
在 spark-submit 作业(用 Scala 编写的 .JAR)中,我需要访问现有的 MongoDB,在数据库中创建新集合,添加索引,从分布在 1000 多个执行程序的 RDD 写入数据收藏。
我找不到一个图书馆可以做到这一切。现在,我正在使用 mongo-spark-connector 从 RDD 写入,然后我使用 casbah 创建索引。
mongo spark 连接器(scaladoc 在哪里?)- https://docs.mongodb.com/spark-connector/current/scala-api/
casbah - http://mongodb.github.io/casbah/3.1/scaladoc/#package
流程是这样的……
- 创建 RDD
- 从 RDD 写入新集合(使用 mongo spark 连接器)
- 写入后在集合上创建索引(使用 casbah)
这种方法会加快速度吗?任何想法如何完成它?
- 创建空集合
- 创建索引
- 构建 RDD 并写入此集合
- 使用一个库来完成
这就是我现在的做法,但我怀疑还有更好的方法。
进口
// casbah - used to create index after new collection is created
import com.mongodb.casbah.Imports.{MongoClient,MongoCollection,MongoClientURI}
// mongo-spark-connector used to write to Mongo from Spark cluster (and create new collection in process)
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.{WriteConfig,ReadConfig}
import org.bson.Document
连接信息
object MyConnect {
// mongodb connect
val host = "128.128.128.128"
val port = 12345
val db = "db"
val collection = "collection"
val user = "user"
val password = "password"
// casbah - to create index
val casbah_db_uri = MongoClientURI(
s"mongodb://${user}:${password}@${host}:${port}/${db}"
)
// mongodb spark connector - to write from RDD
val collection_uri = s"mongodb://${user}:${password}@${host}:${port}/${db}.${collection}"
val writeConfig: WriteConfig = WriteConfig(Map("uri"->collection_uri))
}
做事
object sparkSubmit {
def main(args: Array[String]): Unit = {
// dummy dataset - RDD[(id, cnt)]
val rdd_dummy: RDD[(String, Int)] = ???
// data as Mongo docs - as per mongo spark connector
val rdd_bson: RDD[Document] = {
rdd_dummy
.map(tup => s"""{"hex":"${tup._1}", "cnt":${tup._2}}""")
.map(str => Document.parse(str))
}
// save to mongo / create new collection in process - using mongo spark connector
MongoSpark.save(rdd_bson, MyConnect.writeConfig)
// create index on new collection - using casbah
val new_table: MongoCollection = MongoClient(MyConnect.casbah_db_uri)(MyConnect.db)(MyConnect.collection)
new_table.createIndex("hex")
}
}
【问题讨论】:
标签: mongodb scala apache-spark rdd