您的 mycsv 是您的 csv 作为文件。
groupByKey(_._1.toLowerCase)
是你需要的。
注意:
使用大数据的 RDD 方法是性能瓶颈,因为如果您使用数据帧数据集,它使用 java 序列化,钨将用作内部存储器格式。 So always prefer DataSet and DataFrame approches.
package com.examples
import org.apache.log4j.Level
import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}
object DataSetGroupTest {
org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]) {
val spark = SparkSession.builder.
master("local")
.appName("DataSetGroupTest")
.getOrCreate()
import spark.implicits._
// if you have a file
val csvData: Dataset[String] = spark.read.text("mycsv.csv").as[String]
csvData.show(false)
//csvData.foreach(println(_))
val words: Dataset[Array[String]] = csvData.map(value => value.split(","))
println("convert to array")
val finalwords: Dataset[(String, String, String, String)] = words.map { case Array(f1, f2, f3, f4) => (f1, f2, f3, f4) }
finalwords.foreach(println(_))
val groupedWords: KeyValueGroupedDataset[String, (String, String, String, String)] = finalwords.groupByKey(_._1.toLowerCase)
val counts: Dataset[(String, Long)] = groupedWords.count().sort($"count(1)".desc)
counts.show(false)
}
}
结果:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+------------------------------------------+
|value |
+------------------------------------------+
|Freiburg,2014,Germany,7747 |
|Arsenal,2014,Germany,7745 |
|Arsenal,2014,Germany,7750 |
|Arsenal,2014,Germany,7758 |
|Bayern Munich,2014,Germany,7737 |
|Bayern Munich,2014,Germany,7744 |
|Bayern Munich,2014,Germany,7746 |
|Bayern Munich,2014,Germany,7749 |
|Bayern Munich,2014,Germany,7752 |
|Bayern Munich,2014,Germany,7754 |
|Bayern Munich,2014,Germany,7755 |
|Borussia Dortmund,2014,Germany,7739 |
|Borussia Dortmund,2014,Germany,7740 |
|Borussia Dortmund,2014,Germany,7742 |
|Borussia Dortmund,2014,Germany,7743 |
|Borussia Dortmund,2014,Germany,7756 |
|Borussia Mönchengladbach,2014,Germany,7757|
|Schalke 04,2014,Germany,7741 |
|Schalke 04,2014,Germany,7753 |
|Chelsea,2014,Germany,7751 |
+------------------------------------------+
only showing top 20 rows
convert to array
(Freiburg,2014,Germany,7747)
(Arsenal,2014,Germany,7745)
(Arsenal,2014,Germany,7750)
(Arsenal,2014,Germany,7758)
(Bayern Munich,2014,Germany,7737)
(Bayern Munich,2014,Germany,7744)
(Bayern Munich,2014,Germany,7746)
(Bayern Munich,2014,Germany,7749)
(Bayern Munich,2014,Germany,7752)
(Bayern Munich,2014,Germany,7754)
(Bayern Munich,2014,Germany,7755)
(Borussia Dortmund,2014,Germany,7739)
(Borussia Dortmund,2014,Germany,7740)
(Borussia Dortmund,2014,Germany,7742)
(Borussia Dortmund,2014,Germany,7743)
(Borussia Dortmund,2014,Germany,7756)
(Borussia Mönchengladbach,2014,Germany,7757)
(Schalke 04,2014,Germany,7741)
(Schalke 04,2014,Germany,7753)
(Chelsea,2014,Germany,7751)
(Hannover 96,2014,Germany,7738)
(Real Madrid,2014,Germany,7748)
(Lazio,2014,Germany,7759)
+------------------------+--------+
|value |count(1)|
+------------------------+--------+
|bayern munich |7 |
|borussia dortmund |5 |
|arsenal |3 |
|schalke 04 |2 |
|lazio |1 |
|hannover 96 |1 |
|chelsea |1 |
|real madrid |1 |
|freiburg |1 |
|borussia mönchengladbach|1 |
+------------------------+--------+