【问题标题】:scala-spark: How to filter RDD after groupbyscala-spark:如何在 groupby 之后过滤 RDD
【发布时间】:2015-07-15 02:28:42
【问题描述】:

我已经开始使用具有管道分隔字符串的 RDD。我已经处理了数据并进入了以下格式:

((0001F46468,239394055),(7665710590658745,-414963169),0,1420276980302)
((0001F46468,239394055),(8016905020647641,183812619),1,1420347885727)
((0001F46468,239394055),(6633110906332136,294201185),1,1420398323110)
((0001F46468,239394055),(6633110906332136,294201185),0,1420451687525)
((0001F46468,239394055),(7722056727387069,1396896294),1,1420537469065)
((0001F46468,239394055),(7722056727387069,1396896294),1,1420623297340)
((0001F46468,239394055),(8045651092287275,-4814845),1,1420720722185)
((0001F46468,239394055),(5170029699836178,-1332814297),0,1420750531018)
((0001F46468,239394055),(7722056727387069,1396896294),0,1420807545137)
((0001F46468,239394055),(4784119468604853,1287554938),1,1421050087824) 

只是为了对数据的描述给出一个高层次的看法。您可以将主元组中的第一个元素(第一个元组)视为用户标识,第二个元组作为产品标识,第三个元素是用户对产品的偏好。 (为了将来参考我将上面的数据集标记为val userData

我的目标是,如果用户对产品同时投了正面 (1) 和负面 (0) 偏好,则只记录正面的记录。例如:

((0001F46468,239394055),(6633110906332136,294201185),1,1420398323110)
((0001F46468,239394055),(6633110906332136,294201185),0,1420451687525)

我只想保留

((0001F46468,239394055),(6633110906332136,294201185),1,1420398323110) 

所以我按用户产品元组对用户进行分组 (0001F46468,239394055),(6633110906332136,294201185

val groupedFiltered = userData.groupBy(x => (x._1, x._2)).map(u => {
      for(k <- u._2) {
        if(k._3 > 0)
          u
      }
    })

但这会返回空元组。

所以我采取了以下方法:

val groupedFiltered = userData. groupBy(x => (x._1, x._2)).flatMap(u => u._2).filter(m => m._3 > 0)

((47734739656882457,-1782798434),(7585453414177905,-461779195),1,1422013413082)
((47734739656882457,-1782798434),(7585453414177905,-461779195),1,1422533237758)
((55218449094787901,-1374432022),(6227831620534109,1195766703),1,1420410603596)
((71212122719822610,-807015489),(6769904840922490,1642054117),1,1422549467554)
((75414197560031509,1830213715),(6724015489416254,-1389654186),1,1420196951100)
((60422797294995441,734266951),(6335216393920738,1528026712),1,1421161253600)
((35091051395844216,451349158),(8135854751464083,-1751839326),1,1422083101033)
((16647193023519619,990937787),(5384884550662007,-910998857),1,1420659873572)
((43355867025936022,-945669937),(7336240855866885,518993644),1,1420880078266)
((12188366927481231,-2007889717),(5336507724485344,363519858),1,1420827788022)

这是有希望的,但它似乎只在用户对同一项目有 1 和 0 的情况下才获取所有为零的记录,只保留带有 1 的记录。

【问题讨论】:

  • 彼得的答案可能是最好的,因为它会产生一个项目,而如果有多个正面评价,其他方式会产生多个。但是,你已经接近了,只需将第一个更改为 flatMap 并将 for 更改为 a for yield...并且 yield k,而不是 u

标签: scala apache-spark


【解决方案1】:

您只能保留分组结果中的最大用户偏好。

userData
 // group by user and product
 .groupBy(x => (x._1, x._2))
 // only keep the maximum user preference per user/product
 .mapValues(_.maxBy(_._3))
 // only keep the values
 .values

【讨论】:

  • toList 无法从values 获得,我想知道为什么会这样..? toList 是必要的,因为在我做 .values 之后它已经 rdd 对吗?
  • values 在你的情况下就可以了,是的。
  • 这在我尝试时确实有效,但由于某种原因,当我执行groupBy 时,我得到了java.lang.ArrayIndexOutOfBoundsException (null) [duplicate 1]。在创建一个新问题之前,我想我会先发表评论。我试图绕过groupBy 怎么会抛出这样的错误。
  • @Null-Hypothesis 我认为您应该打开一个新问题,其中包含有关您在收到此错误时使用的输入的更多信息。
猜你喜欢
  • 2015-06-27
  • 2016-09-04
  • 1970-01-01
  • 2017-06-26
  • 1970-01-01
  • 2016-12-08
  • 1970-01-01
  • 1970-01-01
  • 2015-10-26
相关资源
最近更新 更多