【问题标题】:Cumulative function in spark scalaspark scala中的累积函数
【发布时间】:2020-01-26 16:07:45
【问题描述】:

我试过这个来计算累积值,但如果日期字段相同,这些值会添加到累积字段中,有人可以建议解决方案类似于this question

val windowval = (Window.partitionBy($"userID").orderBy($"lastModified")
             .rangeBetween(Window.unboundedPreceding, 0))
val df_w_cumsum = ms1_userlogRewards.withColumn("totalRewards", sum($"noOfJumps").over(windowval)).orderBy($"lastModified".asc)
df_w_cumsum.filter($"batchType".isNull).filter($"userID"==="355163").select($"userID", $"noOfJumps", $"totalRewards",$"lastModified").show()

【问题讨论】:

  • 这张图片是预期输出的吗?
  • 不,这不是他预期的输出,只要时间戳相同,就不会求和
  • 好的,您可以添加数据框而不是图像吗?这样会更容易。
  • 好的,我不太明白,但我建议一个解决方案
  • 已经更新了数据框,我猜你可以理解了

标签: scala apache-spark cumulative-sum


【解决方案1】:

请注意,您的第一个 totalRewards=147 是前一个值 49 + 所有时间戳为“2019-08-07 18:25:06”的值的总和:49 + (36 + 0 + 60 + 2) = 147

第一个选项是聚合所有具有相同时间戳的值,例如groupBy($"userId", $"lastModified").agg(sum($"noOfJumps").as("noOfJumps"))(或类似的东西)然后运行你的总和。这将完全删除重复的时间戳。

第二个选项是使用row_number 首先在具有相同lastModified 字段的行之间定义一个顺序,然后使用.orderBy($"lastModified, $"row_number")(或类似的东西)运行您的总和。这应该保留所有记录并在此过程中为您提供部分总结:totalRewards = 49 -> 85 -> 85 -> 145 -> 147(或类似内容,具体取决于 row_number 定义的顺序)

【讨论】:

  • 感谢您的建议将尝试所有两种解决方案
【解决方案2】:

我认为您想按用户 ID 和时间戳求和。 因此,您需要按用户 ID 和日期进行分区,并使用窗口函数进行符号化,如下所示:

import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.expressions.Window

val window = Window.partitionBy("userID", "lastModified")
df.withColumn("cumulativeSum", sum(col("noOfJumps").over(window))

【讨论】:

  • 我也试过了,结果与预期不同
  • 如果时间戳相同,您希望用户对 noOfJumps 求和。正确的 ?例如,您想在 TotalRewards 列中有 62 个吗?之后您需要删除重复项。
  • 我不清楚。因为我可能没有看到所有数据。
猜你喜欢
  • 2019-07-15
  • 2018-06-01
  • 2018-07-14
  • 1970-01-01
  • 2018-05-13
  • 2019-03-30
  • 2018-10-27
  • 2022-01-23
  • 1970-01-01
相关资源
最近更新 更多