【问题标题】:Scala Spark Mongo - filter with "in" clauseScala Spark Mongo - 使用“in”子句过滤
【发布时间】:2021-01-11 18:06:17
【问题描述】:

我有一个可能相当大的 mongoDB 集合。

我正在使用以下连接器,以便使用 spark 从此集合中读取数据:

        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>2.4.2</version>
        </dependency>

我想通过从另一个数据源获取的 id 的封闭列表(也可能很长)过滤集合。

我从文档中了解到,某些过滤可以被下推以在 mongo 端发生。

例如

rdd.filter(doc => doc.getInteger("test") > 5)

我想知道是否有办法执行类似的操作:

val ids = spark.sql("select ids from some_non_mongo_table")
val mongoDocs = MongoSpark.load(spark.sparkContext, mongoConf)
                          .filter(doc => doc.id in ids)

如果不可能,除了从 mongo 获取整个集合并将结果与​​ ids 数据框连接之外,还有其他合理的解决方案吗?

【问题讨论】:

  • 对您的 ID 数据帧执行广播内连接是比使用 filter(...) 方法更好的解决方案。
  • 但在这种情况下,我从 mongo no 获取整个集合?
  • Spark Dataset 只会从源加载所需的数据,这称为谓词下推。据我了解,MangoDB 支持谓词下推,如下面的链接所述。 raphael-brugier.com/blog/introduction-mongodb-spark-connector/…。无论您执行过滤还是执行连接,都会发生谓词过滤。如果您的 ID 预计会从少数增长到数万,那么广播加入将是一种更好的方法。
  • 我正在检查这种方法,但正如我从博客文章中了解到的那样,只有 where 子句和 select 子句被推送到 mongo。 “谓词下推是 Spark SQL 的 Catalyst 优化器的一种优化,用于将 where 过滤器和选择投影下推到数据源”

标签: mongodb scala apache-spark


【解决方案1】:

你能用.filter(doc =&gt; ids.contain(doc.id))吗?

val ids = spark.sql("select ids from some_non_mongo_table").collect.map(r => r(0))

val mongoDocs = MongoSpark.load(spark.sparkContext, mongoConf)
                          .filter(doc => ids.contains(doc.id))

【讨论】:

  • 感谢您的建议,我正在研究这种方法,但在将结果转换为 DataFrame 时遇到了一些麻烦。结果是 RDD[Document] 类型,尽管 IDE (intelij) 完成了 toDF() 方法,但它没有编译。
  • @OdedRosenberg 能否请您在编译时显示任何错误?
  • Error:(41, 8) value toDF is not a member of org.apache.spark.rdd.RDD[org.bson.Document] 可能的原因:可能在 `value toDF 之前缺少分号'? .toDF()
  • @OdedRosenberg 你可以尝试导入import spark.implicits._ 并再次尝试toDF..?
  • 它是在代码范围内导入的,我认为这仅适用于案例类
【解决方案2】:

最后,我将过滤值作为管道发送。 看起来是更好的方式,因为无法以其他方式将过滤器下推,并且获取整个集合太重了。

    // ids is a comma delimited string 
    val pipeline = "{$match: {id : {$in:[" + ids + "]}}}"
    val mongoConf = mongoReadConfig


    val existingSnapshots = MongoSpark
      .load(spark.sparkContext, mongoConf)
      .withPipeline(Seq(Document.parse(pipeline)))
      .toDF()

【讨论】:

    猜你喜欢
    • 2015-09-11
    • 1970-01-01
    • 2016-06-22
    • 1970-01-01
    • 2022-08-06
    • 2017-06-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多