【发布时间】:2016-09-01 09:48:40
【问题描述】:
我正在运行一个 Spark 作业,并且在某个时候我想连接到弹性搜索服务器以获取一些数据并将它们添加到 RDD。所以我使用的代码是这样的
input.mapParitions(records=>{
val elcon=new ElasticSearchConnection
val client:TransportClient=elcon.openConnection()
val newRecs=records.flatMap(record=>{
val response = client.prepareGet("index" "indexType",
record.id.toString).execute().actionGet()
val newRec=processRec(record,reponse)
newRec
})//end of flatMap
client.close()
newRecs
})//end of mapPartitions
我的问题是在flatMap 操作完成之前调用了client.close() 命令,这当然会导致异常。如果我在 flatMap 中移动连接的生成和关闭,代码就可以工作,但这会产生大量的连接。是否可以确保在flatMap操作完成后会调用client.close?
【问题讨论】:
-
您的问题解决了吗?
-
感谢您的建议和帮助。我已经考虑过你提出的替代方案,但我也调用了另一个服务,所以我不确定我将如何使用你建议的框架。目前,我发现了一个次优的解决方法,使用 while 循环而不是 mapPartitions 中的映射。虽然这通常很慢,但我的瓶颈是网络调用,所以这个阶段的并行性并不重要。
-
这似乎解决了这个问题:stackoverflow.com/questions/36545579/…
标签: scala elasticsearch apache-spark network-connection