【问题标题】:Cumulative sum in SparkSpark中的累积和
【发布时间】:2018-06-01 03:02:30
【问题描述】:

我想在 Spark 中做累计和。这是寄存器表(输入):

+---------------+-------------------+----+----+----+
|     product_id|          date_time| ack|val1|val2|
+---------------+-------------------+----+----+----+
|4008607333T.upf|2017-12-13:02:27:01|3-46|  53|  52|
|4008607333T.upf|2017-12-13:02:27:03|3-47|  53|  52|
|4008607333T.upf|2017-12-13:02:27:08|3-46|  53|  52|
|4008607333T.upf|2017-12-13:02:28:01|3-47|  53|  52|
|4008607333T.upf|2017-12-13:02:28:07|3-46|  15|   1|
+---------------+-------------------+----+----+----+

Hive 查询:

select *, SUM(val1) over ( Partition by product_id, ack order by date_time rows between unbounded preceding and current row ) val1_sum, SUM(val2) over ( Partition by product_id, ack order by date_time rows between unbounded preceding and current row ) val2_sum from test

输出:

+---------------+-------------------+----+----+----+-------+--------+
|     product_id|          date_time| ack|val1|val2|val_sum|val2_sum|
+---------------+-------------------+----+----+----+-------+--------+
|4008607333T.upf|2017-12-13:02:27:01|3-46|  53|  52|     53|      52|
|4008607333T.upf|2017-12-13:02:27:08|3-46|  53|  52|    106|     104|
|4008607333T.upf|2017-12-13:02:28:07|3-46|  15|   1|    121|     105|
|4008607333T.upf|2017-12-13:02:27:03|3-47|  53|  52|     53|      52|
|4008607333T.upf|2017-12-13:02:28:01|3-47|  53|  52|    106|     104|
+---------------+-------------------+----+----+----+-------+--------+

使用 Spark 逻辑,我得到相同的输出:

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('product_id, 'ack).orderBy('date_time)
import org.apache.spark.sql.functions._

val newDf = inputDF.withColumn("val_sum", sum('val1) over w).withColumn("val2_sum", sum('val2) over w)
newDf.show

但是,当我在 spark 集群 val_sum 上尝试此逻辑时,值将是累积总和的一半,并且有时会有所不同。我不知道为什么它会发生在火花集群上。是分区的原因吗?

如何计算火花簇上的列的累积总和?

【问题讨论】:

    标签: sql scala apache-spark hive cumulative-sum


    【解决方案1】:

    要使用 DataFrame API 获取累积总和,您应该使用 rowsBetween 窗口方法。在 Spark 2.1 和更新版本 中,按如下方式创建窗口:

    val w = Window.partitionBy($"product_id", $"ack")
      .orderBy($"date_time")
      .rowsBetween(Window.unboundedPreceding, Window.currentRow)
    

    这将告诉 Spark 使用从分区开始到当前行的值。使用旧版本的 Spark,使用 rowsBetween(Long.MinValue, 0) 获得相同的效果。

    要使用窗口,请使用与之前相同的方法:

    val newDf = inputDF.withColumn("val_sum", sum($"val1").over(w))
      .withColumn("val2_sum", sum($"val2").over(w))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-03-30
      • 2020-01-26
      • 2018-02-07
      • 2016-05-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多