【问题标题】:Spark RDD to new MongoDB collection with index in ScalaSpark RDD 到新的 MongoDB 集合,在 Scala 中具有索引
【发布时间】:2017-10-31 22:38:36
【问题描述】:

在 spark-submit 作业(用 Scala 编写的 .JAR)中,我需要访问现有的 MongoDB,在数据库中创建新集合,添加索引,从分布在 1000 多个执行程序的 RDD 写入数据收藏。

我找不到一个图书馆可以做到这一切。现在,我正在使用 mongo-spark-connector 从 RDD 写入,然后我使用 casbah 创建索引。

mongo spark 连接器(scaladoc 在哪里?)- https://docs.mongodb.com/spark-connector/current/scala-api/

casbah - http://mongodb.github.io/casbah/3.1/scaladoc/#package

流程是这样的……

  • 创建 RDD
  • 从 RDD 写入新集合(使用 mongo spark 连接器)
  • 写入后在集合上创建索引(使用 casbah)

这种方法会加快速度吗?任何想法如何完成它?

  • 创建空集合
  • 创建索引
  • 构建 RDD 并写入此集合
  • 使用一个库来完成

这就是我现在的做法,但我怀疑还有更好的方法。

进口

// casbah - used to create index after new collection is created 
import com.mongodb.casbah.Imports.{MongoClient,MongoCollection,MongoClientURI}

// mongo-spark-connector used to write to Mongo from Spark cluster (and create new collection in process)
import com.mongodb.spark.MongoSpark 
import com.mongodb.spark.config.{WriteConfig,ReadConfig}
import org.bson.Document 

连接信息

object MyConnect {
  // mongodb connect
  val host       = "128.128.128.128"
  val port       = 12345
  val db         = "db"
  val collection = "collection"
  val user       = "user"
  val password   = "password"

  // casbah - to create index 
  val casbah_db_uri = MongoClientURI(
    s"mongodb://${user}:${password}@${host}:${port}/${db}"
  )

  // mongodb spark connector - to write from RDD 
  val collection_uri = s"mongodb://${user}:${password}@${host}:${port}/${db}.${collection}"
  val writeConfig: WriteConfig = WriteConfig(Map("uri"->collection_uri))
}

做事

object sparkSubmit {

  def main(args: Array[String]): Unit = {

    // dummy dataset - RDD[(id, cnt)]
    val rdd_dummy: RDD[(String, Int)] = ???

    // data as Mongo docs - as per mongo spark connector
    val rdd_bson: RDD[Document] = {
      rdd_dummy
      .map(tup => s"""{"hex":"${tup._1}", "cnt":${tup._2}}""")
      .map(str => Document.parse(str))
    }

    // save to mongo / create new collection in process - using mongo spark connector
    MongoSpark.save(rdd_bson, MyConnect.writeConfig)

    // create index on new collection - using casbah
    val new_table: MongoCollection = MongoClient(MyConnect.casbah_db_uri)(MyConnect.db)(MyConnect.collection)
    new_table.createIndex("hex")
  }
}

【问题讨论】:

    标签: mongodb scala apache-spark rdd


    【解决方案1】:

    这种方法会加快速度吗?

    通常对于任何数据库(包括 MongoDB),索引构建操作都会有成本。如果在空集合上创建索引,则在(每个)插入操作期间将产生索引构建操作成本。如果在所有插入之后创建索引,那么之后也会产生索引构建成本,这可能会锁定集合直到索引构建完成。

    您可以根据您的用例进行选择,即如果您想在集合完成后立即访问它,请在空集合上创建索引。

    请注意,MongoDB 有两种索引构建操作类型:前台和后台。请参阅MongoDB: Index Creation 了解更多信息。

    scaladoc 在哪里?

    它没有 scaladoc,但是有一个 javadoc:https://www.javadoc.io/doc/org.mongodb.spark/mongo-spark-connector_2.11/2.2.1

    这是因为 MongoDB Spark 连接器使用了下面的 MongoDB Java 驱动程序 jar。

    您应该尝试使用官方的MongoDB Scala driver,而不是使用传统的 Scala 驱动程序 Casbah 创建索引。例如Create An Index

    collection.createIndex(ascending("i"))
    

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-05-17
    • 1970-01-01
    • 2015-10-18
    • 2015-01-29
    • 1970-01-01
    • 2022-11-02
    • 1970-01-01
    • 2015-10-31
    相关资源
    最近更新 更多