【问题标题】:Spark - Sum of row valuesSpark - 行值的总和
【发布时间】:2017-03-31 09:23:58
【问题描述】:

我有以下数据框:

January | February | March
-----------------------------
  10    |    10    |  10
  20    |    20    |  20
  50    |    50    |  50

我正在尝试向其中添加一列,它是每行值的总和。

January | February | March  | TOTAL
----------------------------------
  10    |    10    |   10   |  30
  20    |    20    |   20   |  60
  50    |    50    |   50   |  150

据我所知,所有内置的聚合函数似乎都是用于计算单列中的值。如何在每行的基础上跨列使用值(使用 Scala)?

我已经做到了

val newDf: DataFrame = df.select(colsToSum.map(col):_*).foreach ...

【问题讨论】:

  • 什么是 colsToSum?也许 List[Column] ?

标签: scala apache-spark


【解决方案1】:

你非常接近这个:

val newDf: DataFrame = df.select(colsToSum.map(col):_*).foreach ...

试试这个:

val newDf = df.select(colsToSum.map(col).reduce((c1, c2) => c1 + c2) as "sum")

我认为这是最好的答案,因为它与使用硬编码 SQL 查询的答案一样快,并且与使用 UDF 的答案一样方便。这是两全其美的方法——我什至没有添加一整行代码!

【讨论】:

  • 嗨大卫,有没有类似的方法可以找到最小值而不是总和?
  • 这不是我的想法,我已经有大约 6 个月没有碰过 spark 了,但是尝试改变:(c1, c2) => c1 + c2(c1, c2) => if (c1 < c2) c1; else c2
  • 感谢 Daivd,我已经尝试过这个解决方案,但没有成功
  • 我有类似的情况,但我有超过 300 列要求和,我尝试了解决方案,但由于超出最大迭代次数而失败,有什么建议吗?
  • 你好大卫,col 变量代表什么? colsToSum 的类型是什么,一个字符串列表?
【解决方案2】:

或者,使用 Hugo 的方法和示例,您可以创建一个 UDF 接收任意数量的列和 sum 它们全部。

from functools import reduce

def superSum(*cols):
   return reduce(lambda a, b: a + b, cols)

add = udf(superSum)

df.withColumn('total', add(*[df[x] for x in df.columns])).show()


+-------+--------+-----+-----+
|January|February|March|total|
+-------+--------+-----+-----+
|     10|      10|   10|   30|
|     20|      20|   20|   60|
+-------+--------+-----+-----+

【讨论】:

    【解决方案3】:

    这段代码是用 Python 编写的,但很容易翻译:

    # First we create a RDD in order to create a dataFrame:
    rdd = sc.parallelize([(10, 10,10), (20, 20,20)])
    df = rdd.toDF(['January', 'February', 'March'])
    df.show()
    
    # Here, we create a new column called 'TOTAL' which has results
    # from add operation of columns df.January, df.February and df.March
    
    df.withColumn('TOTAL', df.January + df.February + df.March).show()
    

    输出:

    +-------+--------+-----+
    |January|February|March|
    +-------+--------+-----+
    |     10|      10|   10|
    |     20|      20|   20|
    +-------+--------+-----+
    
    +-------+--------+-----+-----+
    |January|February|March|TOTAL|
    +-------+--------+-----+-----+
    |     10|      10|   10|   30|
    |     20|      20|   20|   60|
    +-------+--------+-----+-----+
    

    您还可以根据需要创建用户定义函数,这里是 Spark 文档的链接: UserDefinedFunction (udf)

    【讨论】:

      【解决方案4】:

      使用动态列选择的 Scala 示例:

      import sqlContext.implicits._
      val rdd = sc.parallelize(Seq((10, 10, 10), (20, 20, 20)))
      val df = rdd.toDF("January", "February", "March")
      df.show()
      
      +-------+--------+-----+
      |January|February|March|
      +-------+--------+-----+
      |     10|      10|   10|
      |     20|      20|   20|
      +-------+--------+-----+
      
      val sumDF = df.withColumn("TOTAL", df.columns.map(c => col(c)).reduce((c1, c2) => c1 + c2))
      sumDF.show()
      
      +-------+--------+-----+-----+
      |January|February|March|TOTAL|
      +-------+--------+-----+-----+
      |     10|      10|   10|   30|
      |     20|      20|   20|   60|
      +-------+--------+-----+-----+
      

      【讨论】:

      • 有没有办法动态地做到这一点,而不是跨列而是沿行?意思是,在不输入spark.sql("select sum(col1), sum(col1) from df") 的情况下,我们可以用另一种方式(在列下,而不是交叉)求和吗?
      • df.select(df.columns.map(c => sum(col(c))) :_*) ?
      【解决方案5】:

      您可以为此使用 expr()。在 scala 中使用

      df.withColumn("TOTAL", expr("January+February+March"))
      

      【讨论】:

      • expr() of sums 如何处理元素之间的空值?
      • 如果总和的任何元素为 null 将返回 null。例如:1+0+null 返回 NULL
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-09-09
      • 1970-01-01
      • 2017-09-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多