【问题标题】:Reducing with a bloom filter使用布隆过滤器减少
【发布时间】:2015-10-25 15:27:29
【问题描述】:

我希望基于应用于大型 Spark RDD 字符串向量(约 1B 条记录)的字符串值函数来获得快速的近似集合成员资格。基本上这个想法是减少到Bloom filter。然后可以将此布隆过滤器广播给工作人员以供进一步使用。

更具体地说,我目前有

rdd: RDD[Vector[String]]
f: Vector[String] => String
val uniqueVals = rdd.map(f).distinct().collect()
val uv = sc.broadcast(uniqueVals)

但是uniqueVals 太大而无法实用,我想用更小(且已知)尺寸的东西来代替它,即布隆过滤器。

我的问题:

  • 是否可以reduce成布隆过滤器,还是必须先收集,再在驱动中构造?

  • 是否有成熟的 Scala/Java Bloom 过滤器实现可以适用于此?

【问题讨论】:

    标签: scala apache-spark bloom-filter


    【解决方案1】:

    是的,布隆过滤器可以减少,因为它们有一些很好的属性(它们是monoids)。这意味着您可以并行执行所有聚合操作,只需对数据进行一次有效的传递即可为每个分区构造 BloomFilter,然后将这些 BloomFilter 一起减少以获得可以查询 contains 的单个 BloomFilter。

    在 Scala 中至少有两个 BloomFilter 实现,而且两个看起来都是成熟的项目(实际上并没有在生产中使用它们)。第一个是Breeze,第二个是Twitter's Algebird。两者都包含不同草图的实现等等。

    这是一个如何使用 Breeze 执行此操作的示例:

    import breeze.util.BloomFilter
    
    val nums = List(1 to 20: _*).map(_.toString)
    val rdd = sc.parallelize(nums, 5)
    
    val bf = rdd.mapPartitions { iter =>
      val bf = BloomFilter.optimallySized[String](10000, 0.001)
      iter.foreach(i => bf += i)
      Iterator(bf)
    }.reduce(_ | _)
    
    println(bf.contains("5")) // true
    println(bf.contains("31")) // false
    

    【讨论】:

    • 此解决方案的一个问题:它将所有分区的所有布隆过滤器发送到驱动程序,然后再合并它们,这很容易导致驱动程序耗尽内存。 treeReduce(_ | _, depth=DEPTH) 通过以树状方式减少来帮助解决这个问题。
    • 很好的解决方案。您还应该在 map 和 reduce 之间添加一个合并,以获得更好的性能。由于分区只有一个布隆过滤器,因此reduce直接将所有布隆过滤器发送给驱动程序进行最终合并。如果有很多分区,它可能会很慢,甚至会出现 OOM。与分区数 k 合并,使得 k*k ~= 初始分区数将是最佳的,即使不使用某些执行器。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-07-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-08-29
    • 2010-10-12
    • 2011-09-22
    相关资源
    最近更新 更多