【问题标题】:Sorting an RDD in Spark在 Spark 中对 RDD 进行排序
【发布时间】:2021-06-04 22:58:06
【问题描述】:

我有一个数据集,列出了客户购买的一般商品。 csv 中的每条记录从左到右列出客户购买的商品。例如(缩短样本):

Bicycle, Helmet, Gloves
Shoes, Jumper, Gloves
Television, Hat, Jumper, Playstation 5

我希望将其放入 scala 中的 RDD 中,并对它们执行计数。

case class SalesItemSummary(SalesItemDesc: String, SalesItemCount: String)
val rdd_1 = sc.textFile("Data/SalesItems.csv")
val rdd_2 = rdd_1.flatMap(line => line.split(",")).countByValue();

以上是一个简短的代码示例。第一行是案例类(尚未使用)。 第二行从 csv 中抓取数据并将其放入 rdd_1 中。很容易。 第三行做平面图,用逗号分割数据,然后对每个数据进行计数。因此,例如,上面的“手套”和“跳线”旁边会有数字 2。其他 1. 在看起来像元组的集合中。 到目前为止一切顺利。

接下来,我想对 rdd_2 进行排序以列出购买次数最多的前 3 项。 我可以用 RDD 做到这一点吗?还是我需要将 RDD 转移到数据框中来实现排序? 如果是这样,我该怎么做?

如何将第 1 行中的案例类应用到 rdd_2,这似乎是一个元组列表?我应该采用这种方法吗?

提前致谢

【问题讨论】:

    标签: scala apache-spark rdd


    【解决方案1】:

    案例类中的计数应该是一个整数......如果你想将结果保留为 RDD,我建议使用 reduceByKey 而不是 countByValue 它返回一个 Map[String, Long] 而不是RDD。

    我还建议使用, 而不是, 进行拆分,以避免项目名称中出现前导空格。

    case class SalesItemSummary(SalesItemDesc: String, SalesItemCount: Int)
    
    val rdd_1 = sc.textFile("Data/SalesItems.csv")
    
    val rdd_2 = rdd_1.flatMap(_.split(", "))
                     .map((_, 1))
                     .reduceByKey(_ + _)
                     .map(line => SalesItemSummary(line._1, line._2))
    
    rdd_2.collect()
    // Array[SalesItemSummary] = Array(SalesItemSummary(Gloves,2), SalesItemSummary(Shoes,1), SalesItemSummary(Television,1), SalesItemSummary(Bicycle,1), SalesItemSummary(Helmet,1), SalesItemSummary(Hat,1), SalesItemSummary(Jumper,2), SalesItemSummary(Playstation 5,1))
    

    要对RDD进行排序,可以使用sortBy:

    val top3 = rdd_2.sortBy(_.SalesItemCount, false).take(3)
    
    top3
    // Array[SalesItemSummary] = Array(SalesItemSummary(Gloves,2), SalesItemSummary(Jumper,2), SalesItemSummary(Shoes,1))
    

    【讨论】:

      猜你喜欢
      • 2014-07-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-07-13
      • 2016-02-19
      • 2015-08-23
      相关资源
      最近更新 更多