【发布时间】:2018-12-19 04:35:52
【问题描述】:
我想遍历数据流,对其运行查询并返回应该写入 ElasticSearch 的结果。我尝试使用 mapPartitions 方法来创建与数据库的连接,但是,我收到这样的错误,这表明分区返回 None 到 rdd(我猜,应该在转换后添加一些操作):
org.elasticsearch.hadoop.EsHadoopException: Could not write all entries for bulk operation [10/10]. Error sample (first [5] error messages)
可以在代码中进行哪些更改以将数据放入 rdd 并将其发送到 ElasticSearch 而不会有任何麻烦?
另外,我在 foreachRDD 中使用 flatMap 解决了这个问题的变体,但是,我在每个 rdd 上创建了到数据库的连接,这在性能方面无效。
这是流式数据处理的代码:
wordsArrays.foreachRDD(rdd => {
rdd.mapPartitions { part => {
val neo4jConfig = neo4jConfigurations.getNeo4jConfig(args(1))
part.map(
data => {
val recommendations = execNeo4jSearchQuery(neo4jConfig, data)
val calendarTime = Calendar.getInstance.getTime
val recommendationsMap = convertDataToMap(recommendations, calendarTime)
recommendationsMap
})
}
}
}.saveToEs("rdd-timed/output")
)
【问题讨论】:
标签: scala apache-spark elasticsearch spark-streaming