【发布时间】:2018-08-30 03:23:14
【问题描述】:
我正在尝试使用 mongo-spark-connector 从 Spark RDD 写入 MongoDB。
我面临两个问题
- [主要问题]如果我根据文档定义主机(使用mongo副本集中的所有实例),我无法连接到Mongo
- [次要/相关问题]如果我只连接到主要,我可以写...但是我通常会在写入第一个集合时使主要崩溃
环境:
- mongo-spark-connector 1.1
- 火花1.6
- scala 2.10.5
首先我将设置一个虚拟示例来演示...
import org.bson.Document
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.WriteConfig
import org.apache.spark.rdd.RDD
/**
* fake json data
*/
val recs: List[String] = List(
"""{"a": 123, "b": 456, "c": "apple"}""",
"""{"a": 345, "b": 72, "c": "banana"}""",
"""{"a": 456, "b": 754, "c": "cat"}""",
"""{"a": 876, "b": 43, "c": "donut"}""",
"""{"a": 432, "b": 234, "c": "existential"}"""
)
val rdd_json_str: RDD[String] = sc.parallelize(recs, 5)
val rdd_hex_bson: RDD[Document] = rdd_json_str.map(json_str => Document.parse(json_str))
一些不会改变的值...
// credentials
val user = ???
val pwd = ???
// fixed values
val db = "db_name"
val replset = "replset_name"
val collection_name = "collection_name"
这是行不通的……在这种情况下,“url”看起来像 machine.unix.domain.org,而“ip”看起来像……嗯,一个 IP 地址。
这就是文档所说的定义主机的方式......与副本集中的每台机器。
val host = "url1:27017,url2:27017,url3:27017"
val host = "ip_address1:27017,ip_address2:27017,ip_address3:27017"
我无法让其中任何一个工作。对 uri 使用我能想到的所有排列...
val uri = s"mongodb://${user}:${pwd}@${host}/${db}?replicaSet=${replset}"
val uri = s"mongodb://${user}:${pwd}@${host}/?replicaSet=${replset}"
val uri = s"mongodb://${user}:${pwd}@${replset}/${host}/${db}"
val uri = s"mongodb://${user}:${pwd}@${replset}/${host}/${db}.${collection_name}"
val uri = s"mongodb://${user}:${pwd}@${host}" // setting db, collection, replica set in WriteConfig
val uri = s"mongodb://${user}:${pwd}@${host}/${db}" // this works IF HOST IS PRIMARY ONLY; not for hosts as defined above
编辑 有关错误消息的更多详细信息.. 错误出现在表单中...
表格 1
通常包括java.net.UnknownHostException: machine.unix.domain.org
此外,即使定义为 IP 地址,也会以 url 形式返回服务器地址
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting
for a server that matches WritableServerSelector. Client view of cluster
state is {type=REPLICA_SET, servers=[{address=machine.unix.domain.org:27017,
type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException:
machine.unix.domain.org}, caused by {java.net.UnknownHostException:
machine.unix.domain.org}}, {address=machine.unix.domain.org:27017,
type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException:
machine.unix.domain.org}, caused by {java.net.UnknownHostException:
machine.unix.domain.org}}, {address=machine.unix.domain.org:27017,
type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException:
machine.unix.domain.org}, caused by {java.net.UnknownHostException:
machine.unix.domain.org}}]
表格2
(身份验证错误...虽然使用相同的凭据连接到主节点只能正常工作)
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting
for a server that matches WritableServerSelector. Client view of cluster
state is {type=REPLICA_SET, servers=[{address=xx.xx.xx.xx:27017,
type=UNKNOWN, state=CONNECTING, exception=
{com.mongodb.MongoSecurityException: Exception authenticating
MongoCredential{mechanism=null, userName='xx', source='admin', password=
<hidden>, mechanismProperties={}}}, caused by
{com.mongodb.MongoCommandException: Command failed with error 18:
'Authentication failed.' on server xx.xx.xx.xx:27017. The full response is {
"ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" :
"AuthenticationFailed", "operationTime" : { "$timestamp" : { "t" :
1534459121, "i" : 1 } }, "$clusterTime" : { "clusterTime" : { "$timestamp" :
{ "t" : 1534459121, "i" : 1 } }, "signature" : { "hash" : { "$binary" :
"xxx=", "$type" : "0" }, "keyId" : { "$numberLong" : "123456" } } } }}}...
结束编辑
这就是它的工作原理......仅在虚拟数据上......更多关于下面......
val host = s"${primary_ip_address}:27017" // primary only
val uri = s"mongodb://${user}:${pwd}@${host}/${db}"
val writeConfig: WriteConfig =
WriteConfig(Map(
"uri" -> uri,
"database" -> db,
"collection" -> collection_name,
"replicaSet" -> replset))
// write data to mongo
MongoSpark.save(rdd_hex_bson, writeConfig)
这...仅连接到主数据库...对于虚拟数据非常有效,但对于真实数据(50 - 100GB 和具有 2700 个分区的 RDD)的主数据库崩溃。我的猜测是它一次打开了太多的连接......看起来它打开了大约 900 个连接来写入(这是因为默认并行度 2700 基于 900 个虚拟内核和 3 倍的并行度系数)。
我猜如果我重新分区以便它打开更少的连接,我会有更好的运气......但我猜这也与仅写入主节点有关,而不是将其传播到所有实例。
我已经阅读了我可以在这里找到的所有内容...但大多数示例都是针对单实例连接的...https://docs.mongodb.com/spark-connector/v1.1/configuration/#output-configuration
【问题讨论】:
标签: mongodb scala apache-spark