【问题标题】:Spark, mapPartitions, network connection is closed before map operation is finishedSpark、mapPartitions、网络连接在地图操作完成前关闭
【发布时间】:2016-09-01 09:48:40
【问题描述】:

我正在运行一个 Spark 作业,并且在某个时候我想连接到弹性搜索服务器以获取一些数据并将它们添加到 RDD。所以我使用的代码是这样的

 input.mapParitions(records=>{
  val elcon=new ElasticSearchConnection
  val client:TransportClient=elcon.openConnection()
 val newRecs=records.flatMap(record=>{
      val response = client.prepareGet("index" "indexType",
      record.id.toString).execute().actionGet()
       val newRec=processRec(record,reponse)
       newRec
   })//end of flatMap
   client.close()
   newRecs
 })//end of mapPartitions

我的问题是在flatMap 操作完成之前调用了client.close() 命令,这当然会导致异常。如果我在 flatMap 中移动连接的生成和关闭,代码就可以工作,但这会产生大量的连接。是否可以确保在flatMap操作完成后会调用client.close

【问题讨论】:

  • 您的问题解决了吗?
  • 感谢您的建议和帮助。我已经考虑过你提出的替代方案,但我也调用了另一个服务,所以我不确定我将如何使用你建议的框架。目前,我发现了一个次优的解决方法,使用 while 循环而不是 mapPartitions 中的映射。虽然这通常很慢,但我的瓶颈是网络调用,所以这个阶段的并行性并不重要。
  • 这似乎解决了这个问题:stackoverflow.com/questions/36545579/…

标签: scala elasticsearch apache-spark network-connection


【解决方案1】:

对 RDD 中的每个项目进行阻塞调用以获取相应的 ElasticSearch 文档会导致问题。通常建议避免阻塞调用。

还有另一种使用 ElasticSearch-for-Hadoop's Spark support 的替代方法。

将 ElasticSearch 索引/类型作为另一个 RDD 读取并将其与您的 RDD 连接。

包括ESHadoop dependency 的正确版本。

import org.elasticsearch.spark._
val esRdd = sc.esRDD("index/indexType")   //This returns a pair RDD of (_id, Map of all key value pairs for all fields]
input.map(record => (record.id, record))  //Convert your RDD of records to a pair rdd of (id, record) as we want to join based on the id
input.join(esRdd).map(rec => processResponse(rec._2._1, rec._2._2)) // Join the two RDDs based on id column it returns a pair RDD with key=id & value=Pair of matching records (id,(inputrddrecord,esrddrecord))

希望这会有所帮助。

PS: 仍然无法缓解缺乏主机代管的问题。 (即每个带有 _id 的文档将来自索引的不同分片)。更好的方法是在创建 ES 索引时实现输入 RDD 和 ES 索引文档的协同定位。

【讨论】:

    猜你喜欢
    • 2017-11-06
    • 2022-12-06
    • 2012-02-17
    • 2015-02-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多