【问题标题】:Write BsonDocument into MongoDB Spark Scala将 BsonDocument 写入 MongoDB Spark Scala
【发布时间】:2020-06-14 21:44:30
【问题描述】:

我想在从 Twitter 流式传输数据时将数据保存到 MongoDB。 DStream 中的每个 RDD 都包含带有值的 Array[String],因此我为这些值设置了键并将它们包装到 org.bson.document 中。当我尝试将 Seq of Documents 写入 MongoDB 时,会出现这样的异常:

ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 9)
java.lang.IllegalArgumentException: clusterListener can not be null

我使用了 Spark MongoDB 连接器,所以这里是我的 build.sbt 文件中的依赖项:

val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.kafka" %% "kafka" % "1.1.0",
  "org.apache.bahir" %% "spark-streaming-twitter" % sparkVersion,
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
  "com.typesafe" % "config" % "1.3.0",
  "org.twitter4j" % "twitter4j-core" % "4.0.6",
  "org.twitter4j" % "twitter4j-stream" % "4.0.6",
  "com.twitter" %% "bijection-avro" % "0.9.6",
  "org.mongodb.spark" %% "mongo-spark-connector" % "2.2.2",
  "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.0",
  "org.json4s" %% "json4s-native" % "3.5.3"
)

另外,我在我的 docker-compose 文件中使用了 MongoDB docker 镜像:

version: '3.3'
services:
  kafka:
      image: spotify/kafka
      ports:
        - "9092:9092"
      environment:
      - ADVERTISED_HOST=localhost
  mongo:
      image: mongo
      restart: always
      environment:
        MONGO_INITDB_ROOT_USERNAME: admin
        MONGO_INITDB_ROOT_PASSWORD: pwd
  mongo-express:
      image: mongo-express
      restart: always
      ports:
        - 8081:8081
      environment:
        ME_CONFIG_MONGODB_ADMINUSERNAME: admin
        ME_CONFIG_MONGODB_ADMINPASSWORD: pwd

这是用于流式传输和写入数据库的代码。这里的 WordArrays 的类型是 DStream[Array[String]]

wordsArrays.foreachRDD(rdd => rdd.collect.foreach(
        record => {
          val docs = sparkContext.parallelize(Seq(new Document("tweetId", record(0)),
            new Document("text", record(1)),
            new Document("favoriteCount", record(1)),
            new Document("retweetCount", record(1)),
            new Document("geoLocation", record(1)),
            new Document("language", record(1)),
            new Document("createdAt", record(1))
          ))
          MongoSpark.save(docs)
        }
    ))

【问题讨论】:

  • 该错误似乎表明 mongo 集群存在身份验证问题。您的 docker compose 文件是否正确写入?我认为环境变量前面需要有一个-。见docs.docker.com/compose/environment-variables/…
  • 你的意思是这样? - MONGO_INITDB_ROOT_USERNAME= admin
  • 是的。这解决了您的问题吗?
  • 没有。它没有。

标签: mongodb scala apache-spark


【解决方案1】:

由于 DStream 中的每个元素,一个 Array[String] 类型的 RDD MongoDB-Spark 连接器都有一个隐式方法可以将 RDD 直接写入 MongoDB

从 SparkConf() 使用的数据库和集合在创建 SparkSession 时提供给它

wordsArrays.foreachRDD(rdd => rdd.saveToMongoDB())

您还可以将 Map 中的数据库、集合和连接 URI 配置传递给 WriteConfig 对象,并将其与 saveToMongoDB() 辅助方法一起使用,如下所示(假设您的 SparkSession 对象称为 spark ):

import com.mongodb.spark.config._

val writeConfig = WriteConfig(
                             Map("uri":"mongodb://","database":"db_name",
                                 "collection" -> "collectionname",
                                 "writeConcern.w" -> "majority"), 
                                  Some(WriteConfig(spark.sparkContext)
                                )
                             )
wordsArrays.foreachRDD(rdd => rdd.saveToMongoDB(writeConfig))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多