【发布时间】:2016-12-10 07:55:54
【问题描述】:
所以我正在学习通过 Apache Spark 从 ElasticSearch 获取数据。 假设我已连接到具有“用户”索引的 ElasticSearch。
sqlContext = SQLContext(sc)
usersES=sqlContext.read.format('org.elasticsearch.spark.sql').option('es.nodes','mynode').load('users/user')
explain(usersES) 告诉我这个:
== 物理计划 ==
扫描 ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147, uid#148]
当我使用过滤器时:
usersES.filter(usersES.uid==1566324).explain()
== 物理计划 == 过滤器 (uid#203L = 1566324) +- 扫描 ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147,uid#148] PushedFilters:[EqualTo(uid,1566324)]
如您所见,Spark 优雅地将过滤器推送到 ElasticSearch,使索引搜索快速而舒适。
但是当我尝试将 usersES 与另一个数据框连接时,我总是遇到同样的问题: Spark 扫描整个 ElasticSearch 索引,不推送我给它的任何过滤器。 例如:
a = sc.parallelize([1566324,1566329]).map(Row('id')).toDF()
a.join(usersES, usersES.uid==a.id).explain()
显示:
SortMergeJoin [id#210L], [uid#203L] :- 排序 [id#210L ASC], false, 0 : +- TungstenExchange hashpartitioning(id#210L,200),无:+- ConvertToUnsafe:+- 扫描 ExistingRDD[id#210L] +- 排序 [uid#203L ASC], false, 0 +- TungstenExchange hashpartitioning(uid#203L,200), 无 +- 转换为不安全 +- 扫描 ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147,uid#148]
请告诉我,是否可以将过滤器推送到 Elasticsearch 内的连接内?
【问题讨论】:
标签: elasticsearch apache-spark