【发布时间】:2021-01-11 18:06:17
【问题描述】:
我有一个可能相当大的 mongoDB 集合。
我正在使用以下连接器,以便使用 spark 从此集合中读取数据:
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.4.2</version>
</dependency>
我想通过从另一个数据源获取的 id 的封闭列表(也可能很长)过滤集合。
我从文档中了解到,某些过滤可以被下推以在 mongo 端发生。
例如
rdd.filter(doc => doc.getInteger("test") > 5)
我想知道是否有办法执行类似的操作:
val ids = spark.sql("select ids from some_non_mongo_table")
val mongoDocs = MongoSpark.load(spark.sparkContext, mongoConf)
.filter(doc => doc.id in ids)
如果不可能,除了从 mongo 获取整个集合并将结果与 ids 数据框连接之外,还有其他合理的解决方案吗?
【问题讨论】:
-
对您的 ID 数据帧执行广播内连接是比使用 filter(...) 方法更好的解决方案。
-
但在这种情况下,我从 mongo no 获取整个集合?
-
Spark Dataset 只会从源加载所需的数据,这称为谓词下推。据我了解,MangoDB 支持谓词下推,如下面的链接所述。 raphael-brugier.com/blog/introduction-mongodb-spark-connector/…。无论您执行过滤还是执行连接,都会发生谓词过滤。如果您的 ID 预计会从少数增长到数万,那么广播加入将是一种更好的方法。
-
我正在检查这种方法,但正如我从博客文章中了解到的那样,只有 where 子句和 select 子句被推送到 mongo。 “谓词下推是 Spark SQL 的 Catalyst 优化器的一种优化,用于将 where 过滤器和选择投影下推到数据源”
标签: mongodb scala apache-spark