【发布时间】:2018-04-16 22:36:37
【问题描述】:
有一个表,其中包含两列 books 和 readers 这些书籍,其中 books 和 readers 分别是书籍和读者 ID。
我需要从这个表中删除阅读超过 10 本书的读者。
首先我按读者对书籍进行分组并获得这些组的大小:
val byReader = data.map(r => (r.reader,r.book))
val booksByReader = byReader.groupByKey()
val booksByReaderCnts = booksByReader.map(tuple => tuple match {
case (reader, bookIter) => (reader, bookIter.size)
})
我在单个节点上运行它,并试图欺骗计算的分布式性质,并在 Scala 哈希映射中存储具有大量书籍的读者 ID,作为副作用。我还以“标准 Spark 方式”过滤掉拥有大量书籍的读者:
val maxBookCnt = 10
var hugeBookCntsMap: Map[Int, Int] = Map() // map to store reader id's with huge book counts
//Get readers with huge book counts
val hugeBookCnts = booksByReaderCnts.filter(tuple => tuple match {
case (reader: Int, cnt: Int) => hugeBookCntsMap += (reader -> cnt); cnt > maxBookCnt
})
Spark 过滤按预期工作,并创建具有大量图书数量的读者对的 RDD:
println("*** Huge cnts has: "+hugeBookCnts.count() + " elements")
hugeBookCnts.take(100).foreach(println)
另一方面,地图仍然是空的:
println("*** Map:")
hugeBookCntsMap.map(tuple => tuple match {
case (reader: Int, cnt: Int) => println("Reader: " + reader + " Cnt: " + cnt)
})
问题:
-
我的想法是创建一个 Scala 哈希映射来存储具有大量图书数量的用户的 ID。接下来我想通过检查用户是否在哈希中来过滤原始数据。那些在散列中的应该被过滤掉。显然,local hash map 没有获取数据,不能用于此目的。 * 主要问题:如何过滤掉阅读量大的读者记录? *
如果一切都在单个节点上运行,为什么本地哈希映射仍然为空?
Spark 是否提供任何机制来组织不同进程之间共享的哈希映射?
【问题讨论】:
标签: scala hash apache-spark filtering