【问题标题】:Spark: optimized join with ElasticSearch indexSpark:使用 ElasticSearch 索引优化连接
【发布时间】:2016-12-10 07:55:54
【问题描述】:

所以我正在学习通过 Apache Spark 从 ElasticSearch 获取数据。 假设我已连接到具有“用户”索引的 ElasticSearch。

sqlContext = SQLContext(sc)
usersES=sqlContext.read.format('org.elasticsearch.spark.sql').option('es.nodes','mynode').load('users/user')

explain(usersES) 告诉我这个:

== 物理计划 ==

扫描 ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147, uid#148]

当我使用过滤器时:

usersES.filter(usersES.uid==1566324).explain()

== 物理计划 == 过滤器 (uid#203L = 1566324) +- 扫描 ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147,uid#148] PushedFilters:[EqualTo(uid,1566324)]

如您所见,Spark 优雅地将过滤器推送到 ElasticSearch,使索引搜索快速而舒适。

但是当我尝试将 usersES 与另一个数据框连接时,我总是遇到同样的问题: Spark 扫描整个 ElasticSearch 索引,不推送我给它的任何过滤器。 例如:

a = sc.parallelize([1566324,1566329]).map(Row('id')).toDF()
a.join(usersES, usersES.uid==a.id).explain()

显示:

SortMergeJoin [id#210L], [uid#203L] :- 排序 [id#210L ASC], false, 0 : +- TungstenExchange hashpartitioning(id#210L,200),无:+- ConvertToUnsafe:+- 扫描 ExistingRDD[id#210L] +- 排序 [uid#203L ASC], false, 0 +- TungstenExchange hashpartitioning(uid#203L,200), 无 +- 转换为不安全 +- 扫描 ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147,uid#148]

请告诉我,是否可以将过滤器推送到 Elasticsearch 内的连接内?

【问题讨论】:

    标签: elasticsearch apache-spark


    【解决方案1】:

    这是一个预期的行为,是的 elaticsearch-hadoop 连接器支持下推谓词,但是当您加入时没有推送。

    这是因为连接操作对数据帧中的键如何分区一无所知。

    默认情况下,此操作会对两个数据帧的所有键进行哈希处理,将具有相同键哈希的所有元素通过网络发送到同一台机器,然后在该机器上将具有相同键的元素连接在一起。

    这就是为什么您可以在没有下推谓词的情况下获得执行计划的原因。

    编辑:似乎连接器从 2.1 版开始就支持 IN 子句。如果您的 DataFrame a 不大,您应该使用它。

    【讨论】:

    • 谢谢!似乎这是唯一的方法,虽然不符合我的需求(df 中的数千条记录到 IN 中的 py)
    • 好吧,唯一的方法是使用 spark 加入而不是 In 子句,或者您甚至可以使用广播变量。
    • 能否请您至少接受答案以关闭问题?
    • 我也有类似的需求。我想我可以将其中一个数据帧放入广播变量中。如果是这种情况(广播哈希连接),Spark 会使用 Elastic 的索引吗?
    • @Avision 你所说的 « spark using elastic's indices » 是什么意思?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多