【问题标题】:How to GROUP BY a Dataset in Apache Spark's Scala?如何在 Apache Spark 的 Scala 中按数据集分组?
【发布时间】:2019-11-01 09:38:28
【问题描述】:

我想按字符串的第一部分对我的数据集进行分组。 所以按“SC Freiburg”,“Arsenal”等来分组...... 除了分组依据之外,我还需要分组的计数。

scala> res61.foreach(println)
SC Freiburg,2014,Germany,7747
Arsenal,2014,Germany,7745
Arsenal,2014,Germany,7750
Arsenal,2014,Germany,7758
Bayern Munich,2014,Germany,7737
Bayern Munich,2014,Germany,7744
Bayern Munich,2014,Germany,7746
Bayern Munich,2014,Germany,7749
Bayern Munich,2014,Germany,7752
Bayern Munich,2014,Germany,7754
Bayern Munich,2014,Germany,7755
Borussia Dortmund,2014,Germany,7739
Borussia Dortmund,2014,Germany,7740
Borussia Dortmund,2014,Germany,7742
Borussia Dortmund,2014,Germany,7743
Borussia Dortmund,2014,Germany,7756
Borussia Mönchengladbach,2014,Germany,7757
Schalke 04,2014,Germany,7741
Schalke 04,2014,Germany,7753
Chelsea,2014,Germany,7751
Hannover 96,2014,Germany,7738
Real Madrid,2014,Germany,7748
Lazio,2014,Germany,7759

提示:我必须使用 rdd 操作,请不要建议使用数据框 我看过这个帖子:spark dataset group by and sum 但我不知道在我的例子中重现它。

这是结果输出,来自我的 postgresql 数据库:

【问题讨论】:

    标签: sql scala apache-spark group-by dataset


    【解决方案1】:

    RDD 有 groupBy() 和 groupByKey() 方法。例如,要进行分组计数,您可以这样做:

    val str ="""SC Freiburg,2014,Germany,7747
       Arsenal,2014,Germany,7745
       ...
    """
    val rdd = sc.parallelize(str.split("\n"))
    rdd.map (_.split(",")).keyBy(_(0)).groupByKey().map {case (k, v) => (k, v.size)}.collect
    

    【讨论】:

    • 好的,这很有帮助。你知道我怎样才能按计数订购吗?当前结果:scala scala> res63.foreach(println) (SC Freiburg,1) (Hannover 96,1) (Bayern Munich,7) (Borussia Dortmund,5) (Lazio,1) (Chelsea,1) (Arsenal,3) (Borussia Mönchengladbach,1) (Schalke 04,2) (Real Madrid,1)
    • sortBy() 给了我奇怪的输出,我需要它降序。
    • 使用 sortByKey(false) rdd.map (_.split(",")).keyBy(_(0)).groupByKey().map {case (k, v ) => (v.size, k)}.sortByKey(false).collect
    • 我想通了。我使用了 .sortWith(_._2 > _._2) 它给了我一个降序。
    【解决方案2】:

    假设“yourrdd”代表您之前显示的数据,您可以使用如下所示的内容来得出结果。

    yourrdd.groupBy(_(0)).map(x => (x._1,x._2.size)).sortBy((x => x._2),false).collect.foreach(println)
    

    【讨论】:

      【解决方案3】:

      您的 mycsv 是您的 csv 作为文件。

      groupByKey(_._1.toLowerCase)
      

      是你需要的。


      注意: 使用大数据的 RDD 方法是性能瓶颈,因为如果您使用数据帧数据集,它使用 java 序列化,钨将用作内部存储器格式。 So always prefer DataSet and DataFrame approches.

      package com.examples
      
      import org.apache.log4j.Level
      import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}
      
      
      object DataSetGroupTest {
      org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)
      
      def main(args: Array[String]) {
      
       val spark = SparkSession.builder.
         master("local")
         .appName("DataSetGroupTest")
         .getOrCreate()
      
       import spark.implicits._
       // if you have a file
       val csvData: Dataset[String] = spark.read.text("mycsv.csv").as[String]
      
       csvData.show(false)
       //csvData.foreach(println(_))
       val words: Dataset[Array[String]] = csvData.map(value => value.split(","))
       println("convert to array")
       val finalwords: Dataset[(String, String, String, String)] = words.map { case Array(f1, f2, f3, f4) => (f1, f2, f3, f4) }
       finalwords.foreach(println(_))
       val groupedWords: KeyValueGroupedDataset[String, (String, String, String, String)] = finalwords.groupByKey(_._1.toLowerCase)
       val counts: Dataset[(String, Long)] = groupedWords.count().sort($"count(1)".desc)
       counts.show(false)
      }
      }
      

      结果:

      Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
      +------------------------------------------+
      |value                                     |
      +------------------------------------------+
      |Freiburg,2014,Germany,7747                |
      |Arsenal,2014,Germany,7745                 |
      |Arsenal,2014,Germany,7750                 |
      |Arsenal,2014,Germany,7758                 |
      |Bayern Munich,2014,Germany,7737           |
      |Bayern Munich,2014,Germany,7744           |
      |Bayern Munich,2014,Germany,7746           |
      |Bayern Munich,2014,Germany,7749           |
      |Bayern Munich,2014,Germany,7752           |
      |Bayern Munich,2014,Germany,7754           |
      |Bayern Munich,2014,Germany,7755           |
      |Borussia Dortmund,2014,Germany,7739       |
      |Borussia Dortmund,2014,Germany,7740       |
      |Borussia Dortmund,2014,Germany,7742       |
      |Borussia Dortmund,2014,Germany,7743       |
      |Borussia Dortmund,2014,Germany,7756       |
      |Borussia Mönchengladbach,2014,Germany,7757|
      |Schalke 04,2014,Germany,7741              |
      |Schalke 04,2014,Germany,7753              |
      |Chelsea,2014,Germany,7751                 |
      +------------------------------------------+
      only showing top 20 rows
      
      convert to array
      (Freiburg,2014,Germany,7747)
      (Arsenal,2014,Germany,7745)
      (Arsenal,2014,Germany,7750)
      (Arsenal,2014,Germany,7758)
      (Bayern Munich,2014,Germany,7737)
      (Bayern Munich,2014,Germany,7744)
      (Bayern Munich,2014,Germany,7746)
      (Bayern Munich,2014,Germany,7749)
      (Bayern Munich,2014,Germany,7752)
      (Bayern Munich,2014,Germany,7754)
      (Bayern Munich,2014,Germany,7755)
      (Borussia Dortmund,2014,Germany,7739)
      (Borussia Dortmund,2014,Germany,7740)
      (Borussia Dortmund,2014,Germany,7742)
      (Borussia Dortmund,2014,Germany,7743)
      (Borussia Dortmund,2014,Germany,7756)
      (Borussia Mönchengladbach,2014,Germany,7757)
      (Schalke 04,2014,Germany,7741)
      (Schalke 04,2014,Germany,7753)
      (Chelsea,2014,Germany,7751)
      (Hannover 96,2014,Germany,7738)
      (Real Madrid,2014,Germany,7748)
      (Lazio,2014,Germany,7759)
                                                                                     +------------------------+--------+
      |value                   |count(1)|
      +------------------------+--------+
      |bayern munich           |7       |
      |borussia dortmund       |5       |
      |arsenal                 |3       |
      |schalke 04              |2       |
      |lazio                   |1       |
      |hannover 96             |1       |
      |chelsea                 |1       |
      |real madrid             |1       |
      |freiburg                |1       |
      |borussia mönchengladbach|1       |
      +------------------------+--------+
      
      

      【讨论】:

      • 很好/清晰的解释以及日志和数据类型!
      猜你喜欢
      • 1970-01-01
      • 2015-03-13
      • 2020-07-28
      • 1970-01-01
      • 1970-01-01
      • 2019-05-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多