以下方法可能对您有所帮助
Case class to generate schema
case class Sales (Name: String, Platform: String, Year: Int, Genre: String, Publisher: String,
NA_Sales: Double, EU_Sales: Double, JP_Sales: Double, Other_Sales: Double)
Read the data
val spark = sqlContext.sparkSession
val implicits = spark.implicits
import implicits._
import org.apache.spark.sql.catalyst.ScalaReflection
val data =
"""
|Gran Turismo 3: A-Spec;PS2;2001;Racing;Sony Computer Entertainment;6.85;5.09;1.87;1.16
|Call of Duty: Modern Warfare 3;X360;2011;Shooter;Activision;9.03;4.28;0.13;1.32
|Pokemon Yellow: Special Pikachu Edition;GB;1998;Role-Playing;Nintendo;5.89;5.04;3.12;0.59
|Call of Duty: Black Ops;X360;2010;Shooter;Activision;9.67;3.73;0.11;1.13
|Pokemon HeartGold/Pokemon SoulSilver;DS;2009;Action;Nintendo;4.4;2.77;3.96;0.77
|High Heat Major League Baseball 2003;PS2;2002;Sports;3DO;0.18;0.14;0;0.05
|Panzer Dragoon;SAT;1995;Shooter;Sega;0;0;0.37;0
|Corvette;GBA;2003;Racing;TDK Mediactive;0.2;0.07;0;0.01
""".stripMargin
val ds = spark.read
.schema(ScalaReflection.schemaFor[Sales].dataType.asInstanceOf[StructType])
.option("sep", ";")
.csv(data.split("\n").toSeq.toDS())
ds.show(false)
ds.printSchema()
结果
+---------------------------------------+--------+----+------------+---------------------------+--------+--------+--------+-----------+
|Name |Platform|Year|Genre |Publisher |NA_Sales|EU_Sales|JP_Sales|Other_Sales|
+---------------------------------------+--------+----+------------+---------------------------+--------+--------+--------+-----------+
|Gran Turismo 3: A-Spec |PS2 |2001|Racing |Sony Computer Entertainment|6.85 |5.09 |1.87 |1.16 |
|Call of Duty: Modern Warfare 3 |X360 |2011|Shooter |Activision |9.03 |4.28 |0.13 |1.32 |
|Pokemon Yellow: Special Pikachu Edition|GB |1998|Role-Playing|Nintendo |5.89 |5.04 |3.12 |0.59 |
|Call of Duty: Black Ops |X360 |2010|Shooter |Activision |9.67 |3.73 |0.11 |1.13 |
|Pokemon HeartGold/Pokemon SoulSilver |DS |2009|Action |Nintendo |4.4 |2.77 |3.96 |0.77 |
|High Heat Major League Baseball 2003 |PS2 |2002|Sports |3DO |0.18 |0.14 |0.0 |0.05 |
|Panzer Dragoon |SAT |1995|Shooter |Sega |0.0 |0.0 |0.37 |0.0 |
|Corvette |GBA |2003|Racing |TDK Mediactive |0.2 |0.07 |0.0 |0.01 |
+---------------------------------------+--------+----+------------+---------------------------+--------+--------+--------+-----------+
root
|-- Name: string (nullable = true)
|-- Platform: string (nullable = true)
|-- Year: integer (nullable = false)
|-- Genre: string (nullable = true)
|-- Publisher: string (nullable = true)
|-- NA_Sales: double (nullable = false)
|-- EU_Sales: double (nullable = false)
|-- JP_Sales: double (nullable = false)
|-- Other_Sales: double (nullable = false)
Get Lowest and highest selling genre
// global sales
val processedDF = ds.withColumn("global_sale", col("NA_Sales") + col("EU_Sales") + col("JP_Sales"))
.groupBy("Genre")
.agg(sum("global_sale").as("global_sale_by_genre"))
println("Lowest selling :: " + processedDF.orderBy(col("global_sale_by_genre").asc).head()
.getValuesMap(Seq("Genre", "global_sale_by_genre")).mkString(", "))
println("Highest selling :: " + processedDF.orderBy(col("global_sale_by_genre").desc).head()
.getValuesMap(Seq("Genre", "global_sale_by_genre")).mkString(", "))
结果
Lowest selling :: Genre -> Sports, global_sale_by_genre -> 0.32
Highest selling :: Genre -> Shooter, global_sale_by_genre -> 27.32