【问题标题】:writing to Mongo replica set from Spark (in scala)从 Spark 写入 Mongo 副本集(在 Scala 中)
【发布时间】:2018-08-30 03:23:14
【问题描述】:

我正在尝试使用 mongo-spark-connector 从 Spark RDD 写入 MongoDB。

我面临两个问题

  • [主要问题]如果我根据文档定义主机(使用mongo副本集中的所有实例),我无法连接到Mongo
  • [次要/相关问题]如果我只连接到主要,我可以写...但是我通常会在写入第一个集合时使主要崩溃

环境:

  • mongo-spark-connector 1.1
  • 火花1.6
  • scala 2.10.5

首先我将设置一个虚拟示例来演示...

import org.bson.Document 
import com.mongodb.spark.MongoSpark 
import com.mongodb.spark.config.WriteConfig

import org.apache.spark.rdd.RDD

/** 
  * fake json data
  */

val recs: List[String] = List(
  """{"a": 123, "b": 456, "c": "apple"}""",
  """{"a": 345, "b":  72, "c": "banana"}""",
  """{"a": 456, "b": 754, "c": "cat"}""",
  """{"a": 876, "b":  43, "c": "donut"}""",
  """{"a": 432, "b": 234, "c": "existential"}"""
)

val rdd_json_str: RDD[String] = sc.parallelize(recs, 5)
val rdd_hex_bson: RDD[Document] = rdd_json_str.map(json_str => Document.parse(json_str))

一些不会改变的值...

// credentials
val user = ???
val pwd  = ???

// fixed values
val db              = "db_name"
val replset         = "replset_name"
val collection_name = "collection_name"

这是行不通的……在这种情况下,“url”看起来像 machine.unix.domain.org,而“ip”看起来像……嗯,一个 IP 地址。

这就是文档所说的定义主机的方式......与副本集中的每台机器。

val host = "url1:27017,url2:27017,url3:27017"
val host = "ip_address1:27017,ip_address2:27017,ip_address3:27017"

我无法让其中任何一个工作。对 uri 使用我能想到的所有排列...

val uri = s"mongodb://${user}:${pwd}@${host}/${db}?replicaSet=${replset}"
val uri = s"mongodb://${user}:${pwd}@${host}/?replicaSet=${replset}"
val uri = s"mongodb://${user}:${pwd}@${replset}/${host}/${db}"
val uri = s"mongodb://${user}:${pwd}@${replset}/${host}/${db}.${collection_name}"
val uri = s"mongodb://${user}:${pwd}@${host}"       // setting db, collection, replica set in WriteConfig
val uri = s"mongodb://${user}:${pwd}@${host}/${db}" // this works IF HOST IS PRIMARY ONLY; not for hosts as defined above

编辑 有关错误消息的更多详细信息.. 错误出现在表单中...

表格 1

通常包括java.net.UnknownHostException: machine.unix.domain.org

此外,即使定义为 IP 地址,也会以 url 形式返回服务器地址

com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting 
for a server that matches WritableServerSelector. Client view of cluster 
state is {type=REPLICA_SET, servers=[{address=machine.unix.domain.org:27017, 
type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException: 
machine.unix.domain.org}, caused by {java.net.UnknownHostException: 
machine.unix.domain.org}}, {address=machine.unix.domain.org:27017, 
type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException: 
machine.unix.domain.org}, caused by {java.net.UnknownHostException: 
machine.unix.domain.org}}, {address=machine.unix.domain.org:27017, 
type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException: 
machine.unix.domain.org}, caused by {java.net.UnknownHostException: 
machine.unix.domain.org}}]

表格2

(身份验证错误...虽然使用相同的凭据连接到主节点只能正常工作)

com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting 
for a server that matches WritableServerSelector. Client view of cluster  
state is {type=REPLICA_SET, servers=[{address=xx.xx.xx.xx:27017,  
type=UNKNOWN, state=CONNECTING, exception= 
{com.mongodb.MongoSecurityException: Exception authenticating  
MongoCredential{mechanism=null, userName='xx', source='admin', password= 
<hidden>, mechanismProperties={}}}, caused by  
{com.mongodb.MongoCommandException: Command failed with error 18:  
'Authentication failed.' on server xx.xx.xx.xx:27017. The full response is {  
"ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" :  
"AuthenticationFailed", "operationTime" : { "$timestamp" : { "t" :  
1534459121, "i" : 1 } }, "$clusterTime" : { "clusterTime" : { "$timestamp" :  
{ "t" : 1534459121, "i" : 1 } }, "signature" : { "hash" : { "$binary" :  
"xxx=", "$type" : "0" }, "keyId" : { "$numberLong" : "123456" } } } }}}...

结束编辑

这就是它的工作原理......仅在虚拟数据上......更多关于下面......

val host = s"${primary_ip_address}:27017" // primary only
val uri = s"mongodb://${user}:${pwd}@${host}/${db}"

val writeConfig: WriteConfig = 
  WriteConfig(Map(
    "uri"        -> uri, 
    "database"   -> db, 
    "collection" -> collection_name, 
    "replicaSet" -> replset))

// write data to mongo
MongoSpark.save(rdd_hex_bson, writeConfig)

这...仅连接到主数据库...对于虚拟数据非常有效,但对于真实数据(50 - 100GB 和具有 2700 个分区的 RDD)的主数据库崩溃。我的猜测是它一次打开了太多的连接......看起来它打开了大约 900 个连接来写入(这是因为默认并行度 2700 基于 900 个虚拟内核和 3 倍的并行度系数)。

我猜如果我重新分区以便它打开更少的连接,我会有更好的运气......但我猜这也与仅写入主节点有关,而不是将其传播到所有实例。

我已经阅读了我可以在这里找到的所有内容...但大多数示例都是针对单实例连接的...https://docs.mongodb.com/spark-connector/v1.1/configuration/#output-configuration

【问题讨论】:

    标签: mongodb scala apache-spark


    【解决方案1】:

    原来这里有两个问题。从原始问题中,这些被引用为“表格 1”和“表格 2”的错误。

    “表格 1”错误 - 解决方案

    问题的本质原来是 mongo-spark-connector 中的一个错误。事实证明,它无法使用 IP 地址连接到副本集……它需要 URI。由于我们云中的 DNS 服务器没有这些查找,我通过在每个执行程序上修改 /etc/hosts 然后使用如下连接字符串格式来使其工作:

    val host = "URI1:27017,URI2:27017,URI3:27017"
    
    val uri  = s"mongodb://${user}:${pwd}@${host}/${db}?replicaSet=${replset}&authSource=${db}"
    
    val writeConfig: WriteConfig = 
      WriteConfig(Map(
        "uri"->uri, 
        "database"->db, 
        "collection"->collection, 
        "replicaSet"->replset, 
        "writeConcern.w"->"majority"))
    

    这需要首先在每台机器上将以下内容添加到/etc/hosts

    IP1 URI1
    IP2 URI2
    IP3 URI3
    

    当然,我现在不知道如何在集群启动时使用 AWS EMR 中的引导操作来更新 /etc/hosts。但这是另一个问题。 (AWS EMR bootstrap action as sudo)

    “表格 2”错误 - 解决方案

    在 uri 中添加 &amp;authSource=${db} 解决了这个问题。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-04-04
      • 1970-01-01
      • 2013-03-30
      • 2016-06-05
      • 1970-01-01
      • 2017-02-01
      • 2016-12-09
      • 1970-01-01
      相关资源
      最近更新 更多