【问题标题】:Sort by a key, but value has more than one element using Scala使用 Scala 按键排序,但值具有多个元素
【发布时间】:2016-10-13 03:22:08
【问题描述】:

我对 Spark 上的 Scala 非常陌生,想知道如何创建键值对,其中键具有多个元素。例如,我有这个婴儿名字的数据集:

年、名、县、号

2000 年,约翰,国王,50 岁

2000,鲍勃,国王,40

2000 年,玛丽,拿骚,60 岁

2001 年,约翰·金斯,14 岁

2001,简,金斯,30 岁

2001 年,鲍勃,拿骚,45 岁

而且我想找到每个县最常发生的事件,无论年份如何。我该怎么做呢?

我确实使用循环来完成此操作。请参阅下文。但我想知道是否有更短的方法可以利用 Spark 和 Scala 对偶性。 (即我可以减少计算时间吗?)

val names = sc.textFile("names.csv").map(l => l.split(","))

val uniqueCounty = names.map(x => x(2)).distinct.collect

for (i <- 0 to uniqueCounty.length-1) {
    val county = uniqueCounty(i).toString;
    val eachCounty = names.filter(x => x(2) == county).map(l => (l(1),l(4))).reduceByKey((a,b) => a + b).sortBy(-_._2);
    println("County:" + county + eachCounty.first)
}

【问题讨论】:

  • @maasg 请参考上文。我是由原始帖子编辑的。

标签: arrays scala sorting apache-spark


【解决方案1】:

这是使用 RDD 的解决方案。我假设您需要每个县出现最多的名称。

val data = Array((2000, "JOHN", "KINGS", 50),(2000, "BOB", "KINGS", 40),(2000, "MARY", "NASSAU", 60),(2001, "JOHN", "KINGS", 14),(2001, "JANE", "KINGS", 30),(2001, "BOB", "NASSAU", 45))
val rdd = sc.parallelize(data)
//Reduce the uniq values for county/name as combo key
val uniqNamePerCountyRdd = rdd.map(x => ((x._3,x._2),x._4)).reduceByKey(_+_)
// Group names per county.
val countyNameRdd = uniqNamePerCountyRdd.map(x=>(x._1._1,(x._1._2,x._2))).groupByKey()
// Sort and take the top name alone per county
countyNameRdd.mapValues(x => x.toList.sortBy(_._2).take(1)).collect

输出:

res8: Array[(String, List[(String, Int)])] = Array((KINGS,List((JANE,30))), (NASSAU,List((BOB,45))))

【讨论】:

  • 这成功了!谢谢你。我只需要添加一行:
  • babyNames.map(x => (x(1), x(2), toInt(x(4))))
  • 这里我还在sortBy()中加了一个'-':countyNameRdd.mapValues(x => x.toList.sortBy(-_._2).take(1)).collect按降序获取出现次数最多的名称。
【解决方案2】:

您可以使用spark-csv 和Dataframe API。如果您使用的是新版本的 Spark (2.0),则略有不同。 Spark 2.0 有一个基于 spark-csv 的原生 csv 数据源。

使用 spark-csv 将 csv 文件加载到 Dataframe 中。

 val df = sqlContext.read.format("com.databricks.spark.csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load(new File(getClass.getResource("/names.csv").getFile).getAbsolutePath)
df.show

给出输出:

+----+----+------+------+
|Year|Name|County|Number|
+----+----+------+------+
|2000|JOHN| KINGS|    50|
|2000| BOB| KINGS|    40|
|2000|MARY|NASSAU|    60|
|2001|JOHN| KINGS|    14|
|2001|JANE| KINGS|    30|
|2001| BOB|NASSAU|    45|
+----+----+------+------+

DataFrames 使用一组操作进行结构化数据操作。您可以使用一些基本操作来成为您的结果。

import org.apache.spark.sql.functions._
df.select("County","Number").groupBy("County").agg(max("Number")).show

给出输出:

+------+-----------+
|County|max(Number)|
+------+-----------+
|NASSAU|         60|
| KINGS|         50|
+------+-----------+

这是你想要达到的目标吗?

注意agg() 函数所需的import org.apache.spark.sql.functions._

更多 information 关于 Dataframes API

编辑

为了正确的输出:

df.registerTempTable("names")

//there is probably a better query for this
sqlContext.sql("SELECT * FROM (SELECT Name, County,count(1) as Occurrence FROM names GROUP BY Name, County ORDER BY " +
  "count(1) DESC) n").groupBy("County", "Name").max("Occurrence").limit(2).show

给出输出:

+------+----+---------------+
|County|Name|max(Occurrence)|
+------+----+---------------+
| KINGS|JOHN|              2|
|NASSAU|MARY|              1|
+------+----+---------------+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-10-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-24
    • 1970-01-01
    • 1970-01-01
    • 2023-03-26
    相关资源
    最近更新 更多