【问题标题】:Spark : Get max consecutive decrease in valueSpark:获得最大连续减少值
【发布时间】:2021-06-05 23:24:52
【问题描述】:

我的要求是获得值减少的最大计数

以下是我的输入数据集:

+---+-------+
| id| amount|
+---+-------+
|  1|   10.0|
|  1|    9.0|
|  1|    7.0|
|  1|    6.0|
|  2|   50.0|
|  2|   60.0|
|  2|   70.0|
|  3|   90.0|
|  3|   80.0|
|  3|   90.0|
+---+-------+

我要求的结果如下:

+---+--------+
| id| outcome|
+---+--------+
|  1|       3|
|  2|       0|
|  3|       2|
+---+--------+

我的结果(新列)基于 id 分组以及该值连续下降 3 次的次数。对于 id 1,即使它减少了 4 次,我只想要最多 3 次。

在 spark sql 或 spark dataframe(scala) 中的任何建议或帮助将不胜感激。

【问题讨论】:

  • Spark 数据帧是无序的,并且您的数据帧中没有排序。由于缺少排序,未定义上一行的“减少”。

标签: scala dataframe apache-spark pyspark apache-spark-sql


【解决方案1】:

您首先需要一个排序列来计算减少量。在您的示例中没有,因此我们可以使用monotonically_increasing_id 构建一个index 列。然后,我们可以使用窗口和laglead 函数来得到你想要的:

import org.apache.spark.sql.expressions.Window
val win = Window.partitionBy("id").orderBy("index")

df
    .withColumn("index", monotonically_increasing_id)
    // there is a decrease if the amount is less than the next one
    // or greater than the previous one
    .withColumn("decrease", (lag('amount, 1).over(win) > 'amount) ||
                            (lead('amount, 1).over(win) < 'amount) 
    )
    .groupBy("id")
    // we need to cast the boolean to an int to sum them
    .agg(sum('decrease cast "int") as "outcome")
    // capping the outcome to 3
    .withColumn("outcome", when('outcome > 3, lit(3)).otherwise('outcome))
    .orderBy("id").show
+---+-------+                                                                   
| id|outcome|
+---+-------+
|  1|      3|
|  2|      0|
|  3|      2|
+---+-------+

【讨论】:

    【解决方案2】:

    这是一个使用 pyspark 的建议,您可以尝试在 scala 或 sql 中进行复制:

    w = Window.partitionBy("id").orderBy(F.monotonically_increasing_id())
    
    (df.withColumn("Diff",F.col("amount") - F.lag("amount").over(w))
       .withColumn('k', F.lead("Diff").over(w))
       .fillna(0, subset='k').groupby("id").agg(
      F.sum(F.when((F.isnull("Diff") & (F.col("k")<0))|(F.col("Diff")<0),1).otherwise(0))
      .alias("outcome")
    ).withColumn("outcome",F.when(F.col("outcome")>=3,3).otherwise(F.col("outcome"))) ).show()
    

    +---+-------+
    | id|outcome|
    +---+-------+
    |  1|      3|
    |  2|      0|
    |  3|      2|
    +---+-------+
    

    【讨论】:

    • 有点难以一下子掌握,但绝对精彩
    • 上面建议的代码对数据非常敏感,但是:它甚至会考虑分离减少,而不仅仅是连续减少。如果您有 2 次连续减少、1 次增加和 3 次减少,则计数将为 5,这不是操作要求的
    猜你喜欢
    • 2021-06-08
    • 2015-06-19
    • 1970-01-01
    • 2022-10-24
    • 2021-12-11
    • 2012-01-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多