【问题标题】:Scala/Spark - Aggregating RDDScala/Spark - 聚合 RDD
【发布时间】:2015-08-10 22:10:14
【问题描述】:

只是想知道我该怎么做:

假设我有一个包含多个用户名的 (username, age, movieBought) 的 RDD,有些行可以有相同的用户名和年龄,但不同的 movieBought。

如何删除重复的行并将其转换为(用户名、年龄、movieBought1、movieBought2...)?

亲切的问候

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:
    val grouped = rdd.groupBy(x => (x._1, x._2)).map(x => (x._1._1, x._1._2, x._2.map(_._3)))
    
    val results = grouped.collect.toList
    

    UPDATE(如果每个元组也有多个电影项):

    val grouped = rdd.groupBy(x => (x._1, x._2)).map(x => (x._1._1, x._1._2, x._2.map(m => (m._3, m._4))))
    
    val results = grouped.collect.toList
    

    【讨论】:

    • 如果我在列表中有一个额外的项目,例如 numberofmoviesbought1,即:(USERNAME, AGE, MOVIEBOUGHT1, NUMBERBOUGHT) 如何更改此代码以适应此参数?我试着弄乱了,没有得到结果
    • 那么这会给我:(用户名,年龄,(movie1,数字),(movie2,数字)等)?
    • 现在,它会给你 (username, age, [(movie1, number), (movie2, number)] )
    【解决方案2】:

    我本来打算建议收集和列出,但 ka4eli 打败了我。

    我猜你也可以使用 groupBy / groupByKey 然后 reduce/reduceByKey 操作。这个 ofc 的缺点是结果 (movie1,movie2,movie3..) 被连接成 1 个字符串(而不是 List 结构,这使得访问变得困难)。

    val group = rdd.map(x=>((x.name,x.age),x.movie))).groupBy(_._1)
    val result =  group.map(x=>(x._1._1,x._1._2,x._2.map(y=>y._2).reduce(_+","+_)
    

    【讨论】:

    • “这个 ofc 的缺点是结果 (movie1,movie2,movie3..) 被连接成 1 个字符串”只是因为你在 reduce 中进行了字符串连接。请改用 aggregateByKey。
    猜你喜欢
    • 2021-04-07
    • 2015-10-31
    • 2015-07-05
    • 1970-01-01
    • 2017-07-11
    • 2015-10-18
    • 2020-08-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多