【问题标题】:Spark SQL: apply aggregate functions to a list of columnsSpark SQL:将聚合函数应用于列列表
【发布时间】:2016-02-26 06:12:47
【问题描述】:

在执行groupBy 时,有没有办法将聚合函数应用于数据框的所有(或一列)列?换句话说,有没有办法避免对每一列都这样做:

df.groupBy("col1")
  .agg(sum("col2").alias("col2"), sum("col3").alias("col3"), ...)

【问题讨论】:

    标签: apache-spark dataframe apache-spark-sql aggregate-functions


    【解决方案1】:

    有多种方法可以将聚合函数应用于多个列。

    GroupedData类为最常用的函数提供了多个方法,包括countmaxminmeansum,可以直接使用如下:

    • Python:

      df = sqlContext.createDataFrame(
          [(1.0, 0.3, 1.0), (1.0, 0.5, 0.0), (-1.0, 0.6, 0.5), (-1.0, 5.6, 0.2)],
          ("col1", "col2", "col3"))
      
      df.groupBy("col1").sum()
      
      ## +----+---------+-----------------+---------+
      ## |col1|sum(col1)|        sum(col2)|sum(col3)|
      ## +----+---------+-----------------+---------+
      ## | 1.0|      2.0|              0.8|      1.0|
      ## |-1.0|     -2.0|6.199999999999999|      0.7|
      ## +----+---------+-----------------+---------+
      
    • 斯卡拉

      val df = sc.parallelize(Seq(
        (1.0, 0.3, 1.0), (1.0, 0.5, 0.0),
        (-1.0, 0.6, 0.5), (-1.0, 5.6, 0.2))
      ).toDF("col1", "col2", "col3")
      
      df.groupBy($"col1").min().show
      
      // +----+---------+---------+---------+
      // |col1|min(col1)|min(col2)|min(col3)|
      // +----+---------+---------+---------+
      // | 1.0|      1.0|      0.3|      0.0|
      // |-1.0|     -1.0|      0.6|      0.2|
      // +----+---------+---------+---------+
      

    您可以选择传递应聚合的列列表

    df.groupBy("col1").sum("col2", "col3")
    

    您还可以传递字典/映射,其中包含键和函数作为值的列:

    • Python

      exprs = {x: "sum" for x in df.columns}
      df.groupBy("col1").agg(exprs).show()
      
      ## +----+---------+
      ## |col1|avg(col3)|
      ## +----+---------+
      ## | 1.0|      0.5|
      ## |-1.0|     0.35|
      ## +----+---------+
      
    • 斯卡拉

      val exprs = df.columns.map((_ -> "mean")).toMap
      df.groupBy($"col1").agg(exprs).show()
      
      // +----+---------+------------------+---------+
      // |col1|avg(col1)|         avg(col2)|avg(col3)|
      // +----+---------+------------------+---------+
      // | 1.0|      1.0|               0.4|      0.5|
      // |-1.0|     -1.0|3.0999999999999996|     0.35|
      // +----+---------+------------------+---------+
      

    终于可以使用可变参数了:

    • Python

      from pyspark.sql.functions import min
      
      exprs = [min(x) for x in df.columns]
      df.groupBy("col1").agg(*exprs).show()
      
    • 斯卡拉

      import org.apache.spark.sql.functions.sum
      
      val exprs = df.columns.map(sum(_))
      df.groupBy($"col1").agg(exprs.head, exprs.tail: _*)
      

    还有其他一些方法可以达到类似的效果,但这些方法在大多数情况下应该绰绰有余。

    另见:

    【讨论】:

    • 看来aggregateBy 在这里适用。它比groupBy 快(快得多)。哦等等 - DataFrame 不会暴露 aggregateBy -- agg 指向 groupBy。那么这意味着DataFrames ..
    • @javadba 不,这仅意味着 Dataset.groupBy / Dataset.groupByKeyRDD.groupBy / RDD.groupByKey 在一般情况下具有不同的语义。如果是简单的DataFrame 聚合check this。还有更多,但在这里并不重要。
    • @javadba 谢谢。这是另一个有用的(主观上是自我推销提醒)资源:git.io/vM1Ch
    • 如何给列添加别名?
    • @GeekFactory exprs = [min(x).alias("{0}".format(x)) for x in df.columns]
    【解决方案2】:

    另一个相同概念的例子 - 但是说 - 你有 2 个不同的列 - 你想对它们中的每一个应用不同的 agg 函数,即

    f.groupBy("col1").agg(sum("col2").alias("col2"), avg("col3").alias("col3"), ...)
    

    这是实现它的方法 - 虽然我还不知道在这种情况下如何添加别名

    请参阅下面的示例 - 使用地图

    val Claim1 = StructType(Seq(StructField("pid", StringType, true),StructField("diag1", StringType, true),StructField("diag2", StringType, true), StructField("allowed", IntegerType, true), StructField("allowed1", IntegerType, true)))
    val claimsData1 = Seq(("PID1", "diag1", "diag2", 100, 200), ("PID1", "diag2", "diag3", 300, 600), ("PID1", "diag1", "diag5", 340, 680), ("PID2", "diag3", "diag4", 245, 490), ("PID2", "diag2", "diag1", 124, 248))
    
    val claimRDD1 = sc.parallelize(claimsData1)
    val claimRDDRow1 = claimRDD1.map(p => Row(p._1, p._2, p._3, p._4, p._5))
    val claimRDD2DF1 = sqlContext.createDataFrame(claimRDDRow1, Claim1)
    
    val l = List("allowed", "allowed1")
    val exprs = l.map((_ -> "sum")).toMap
    claimRDD2DF1.groupBy("pid").agg(exprs) show false
    val exprs = Map("allowed" -> "sum", "allowed1" -> "avg")
    
    claimRDD2DF1.groupBy("pid").agg(exprs) show false
    

    【讨论】:

      【解决方案3】:

      关于如何创建聚合的当前答案完全正确,但没有一个真正解决问题中也要求的列别名/重命名。

      通常,我是这样处理这种情况的:

      val dimensionFields = List("col1")
      val metrics = List("col2", "col3", "col4")
      val columnOfInterests = dimensions ++ metrics
      
      val df = spark.read.table("some_table") 
          .select(columnOfInterests.map(c => col(c)):_*)
          .groupBy(dimensions.map(d => col(d)): _*)
          .agg(metrics.map( m => m -> "sum").toMap)
          .toDF(columnOfInterests:_*)    // that's the interesting part
      
      

      最后一行实质上将聚合数据帧的每一列重命名为原始字段,实质上是将sum(col2)sum(col3) 更改为简单的col2col3

      【讨论】:

        猜你喜欢
        • 2012-03-10
        • 2020-08-23
        • 2022-12-18
        • 2016-05-15
        • 1970-01-01
        • 1970-01-01
        • 2011-09-05
        • 1970-01-01
        • 2022-01-13
        相关资源
        最近更新 更多