【问题标题】:Spark: Filtering out aggregated data?Spark:过滤掉聚合数据?
【发布时间】:2018-04-16 22:36:37
【问题描述】:

有一个表,其中包含两列 booksreaders 这些书籍,其中 booksreaders 分别是书籍和读者 ID。 我需要从这个表中删除阅读超过 10 本书的读者。

首先我按读者对书籍进行分组并获得这些组的大小:

  val byReader = data.map(r => (r.reader,r.book))
    val booksByReader = byReader.groupByKey()
    val booksByReaderCnts = booksByReader.map(tuple => tuple match {
      case (reader, bookIter) => (reader, bookIter.size)
    })

我在单个节点上运行它,并试图欺骗计算的分布式性质,并在 Scala 哈希映射中存储具有大量书籍的读者 ID,作为副作用。我还以“标准 Spark 方式”过滤掉拥有大量书籍的读者:

    val maxBookCnt = 10
    var hugeBookCntsMap: Map[Int, Int] = Map() // map to store reader id's with huge book counts 
    //Get readers with huge book counts
    val hugeBookCnts = booksByReaderCnts.filter(tuple => tuple match {
      case (reader: Int, cnt: Int) => hugeBookCntsMap += (reader -> cnt); cnt > maxBookCnt
    })

Spark 过滤按预期工作,并创建具有大量图书数量的读者对的 RDD:

    println("*** Huge cnts has: "+hugeBookCnts.count() + " elements")
    hugeBookCnts.take(100).foreach(println)

另一方面,地图仍然是空的:

    println("*** Map:")
    hugeBookCntsMap.map(tuple => tuple match {
      case (reader: Int, cnt: Int) => println("Reader: " + reader + " Cnt: " + cnt) 
    })

问题:

  1. 我的想法是创建一个 Scala 哈希映射来存储具有大量图书数量的用户的 ID。接下来我想通过检查用户是否在哈希中来过滤原始数据。那些在散列中的应该被过滤掉。显然,local hash map 没有获取数据,不能用于此目的。 * 主要问题:如何过滤掉阅读量大的读者记录? *

    1. 如果一切都在单个节点上运行,为什么本地哈希映射仍然为空?

    2. Spark 是否提供任何机制来组织不同进程之间共享的哈希映射?

【问题讨论】:

    标签: scala hash apache-spark filtering


    【解决方案1】:

    有一种方法可以用 Spark 做到这一点,而无需将计数带到单台机器上:

    //count books by reader using reduceByKey transformation (thus no action yet)
    // and filter readers with books count > 10
    val readersWithLotsOfBooksRDD = data.map(r => (r.reader, 1)).reduceByKey((x, y) => x + y).filter{ case (_, x) => x > 10 }
    // produces PairRDD
    val readersWithBooksRDD = data.map( r => (r.reader, r.book))
    //result
    readersWithBooksRDD.subtractByKey(readersWithLotsOfBooksRDD).collect
    

    回答您的其他问题:

    1. 您更新hugeBookCntsMap 的代码在不同JVM 进程的worker 上执行。 Spark 不会将工作人员的价值带回驱动程序

    2. Spark 提供了多种机制,用于将值从驱动程序发送到工作人员以及从工作人员发送回驱动程序。

      • 从驱动程序到工作人员 - 闭包 - hugeBookCntsMap 的值被序列化并发送给工作人员。但是对 worker 上的 hugeBookCntsMap 的更改不会发送回驱动程序。这旨在用于小对象和功能。只读。
      • 从驱动程序到工作程序 - 广播变量 - 这旨在用于大型对象。只读。
      • 从工人到司机 - 工作收集操作或减少操作 - 就像在我的计数代码中一样。 Worker 生成元组,然后将它们聚合并收集回驱动程序
      • 从工人到司机 - 使用蓄能器。但同样应该可以汇总结果

    一般来说,如果您的应用程序的输出太大而无法放入单节点内存 - 保存到 HDFS 或 S3 或其他一些分布式存储。

    【讨论】:

    • 你的意思是br100_rdd.subtractByKey(br100_gt_10).collect 不是less
    • 是的。从那时起,我已经将变量名更新为不那么 Python
    • 非常感谢!在测试您的解决方案时,我发现仅阅读一本书的读者在使用条件x > 1 时也会从结果中删除。试图找出原因......
    • 我尝试了带有玩具示例表的代码 - 10 条记录,一对读者拥有 >1 本书,其余读者 - 只有 1 本书 - 结果只有读者拥有 1 本书。跨度>
    • 尝试在每一个 rdd 上调用 .collect 来调试你的数据集——在subtractByKey之前。
    猜你喜欢
    • 2016-06-26
    • 2023-01-09
    • 1970-01-01
    • 2023-02-14
    • 1970-01-01
    • 2021-07-26
    • 2021-04-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多