【问题标题】:Spark-scala aggregate on multiple columns from a list [duplicate]Spark-scala在列表中的多个列上聚合[重复]
【发布时间】:2019-02-09 14:57:12
【问题描述】:

我有一个数据框,其中包含几个不固定的数字列(它们可以在每次执行期间更改)。假设我有一个带有数字列名称的 Seq 对象。 我想为这些列中的每一个应用一个聚合函数。我尝试了以下方法:

println(numeric_cols)
// -> Seq[String] = List(avgTkts_P1, avgTkts_P2, avgTkts_P3, avgTkts_P4)

var sum_ops = for (c <- numeric_cols) yield org.apache.spark.sql.functions.sum(c).as(c)

var result = df.groupBy($"ID").agg( sum_ops:_* )

但它给了我以下错误:

scala> var avgTktsPerPeriodo = df.groupBy("ID").agg(sum_ops:_*)
<console>:79: error: overloaded method value agg with alternatives:
  (expr: org.apache.spark.sql.Column,exprs: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame <and>
  (exprs: java.util.Map[String,String])org.apache.spark.sql.DataFrame <and>
  (exprs: scala.collection.immutable.Map[String,String])org.apache.spark.sql.DataFrame <and>
  (aggExpr: (String, String),aggExprs: (String, String)*)org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.sql.Column)

知道这在 spark-scala 中是否可行吗?

【问题讨论】:

    标签: scala apache-spark aggregate


    【解决方案1】:

    如果您查看其中一个签名:

    (expr: org.apache.spark.sql.Column,exprs: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame
    

    第一个参数是Column 表达式,第二个参数是可变参数。

    您需要执行以下操作:

    val result = df.groupBy($"ID").agg( sum_ops.head, sum_ops.tail:_* )
    

    【讨论】:

      【解决方案2】:

      Ok 找到了解决方案(Spark 中的 agg 函数接受 Map[colname -> operation]):

      var agg_ops =  numeric_cols map (c => c -> "sum") toMap
      
      var result = df.groupBy($"ID").agg( agg_ops )
      

      【讨论】:

        猜你喜欢
        • 2018-03-02
        • 2018-02-21
        • 1970-01-01
        • 2017-03-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-08-23
        • 2016-04-04
        相关资源
        最近更新 更多