【问题标题】:Spark Windowspec lag function calculating cumulative scoresSpark Windowspec 滞后函数计算累积分数
【发布时间】:2018-10-27 06:11:01
【问题描述】:

我有一个包含每天得分的数据框,我想计算每个用户的累积运行得分。我需要在一个新列上将前一天的累积分数与今天的分数相加,我尝试了 lag 函数进行计算,但由于某些原因它给出了错误。

这是我试过的代码:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val genre = sc.parallelize(List(("Alice", "2016-05-01", "action",0),
                                    ("Alice", "2016-05-02", "0",1),
                                    ("Alice", "2016-05-03", "comedy",0),
                                    ("Alice", "2016-05-04", "action",1),
                                    ("Alice", "2016-05-05", "action",0),
                                    ("Alice", "2016-05-06", "horror",1),
                                    ("Bob", "2016-05-01", "art",0),
                                    ("Bob", "2016-05-02", "0",1),
                                    ("Bob", "2016-05-03", "0",0),
                                    ("Bob", "2016-05-04", "art",0),
                                    ("Bob", "2016-05-05", "comedy",1),
                                    ("Bob", "2016-05-06", "action",0))).
                               toDF("name", "date", "genre","score")

val wSpec2 = Window.partitionBy("name","genre").orderBy("date").rowsBetween(Long.MinValue, 0)
genre.withColumn( "CumScore",genre("score")*2+ lag(("CumScore"),1).over(wSpec2)*2  ).show()

数据框:

-----+----------+------+-----+
| name|      date| genre|score|
+-----+----------+------+-----+
|Alice|2016-05-01|action|    0|
|Alice|2016-05-02|     0|    1|
|Alice|2016-05-03|comedy|    0|
|Alice|2016-05-04|action|    1|
|Alice|2016-05-05|action|    0|
|Alice|2016-05-06|horror|    1|
|  Bob|2016-05-01|   art|    0|
|  Bob|2016-05-02|     0|    1|
|  Bob|2016-05-03|     0|    0|
|  Bob|2016-05-04|   art|    0|
|  Bob|2016-05-05|comedy|    1|
|  Bob|2016-05-06|action|    0|
+-----+----------+------+-----+

我遇到的错误

org.apache.spark.sql.AnalysisException: Window Frame specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$()) must match the required frame specifiedwindowframe(RowFrame, -1, -1);
    at org.apa

【问题讨论】:

    标签: scala apache-spark apache-spark-sql cumulative-sum


    【解决方案1】:

    我尝试了以下方法:

    val wSpec2 = Window.partitionBy("name","genre").orderBy("date").rowsBetween(Long.MinValue, 0)
    val test = genre.withColumn( "CumScore",genre("score")*2)
    test.show()
    val wSpec3 = Window.partitionBy("name").orderBy("date")
    test.withColumn("CumScore_1",test("CumScore")+lag(test("CumScore"),1).over(wSpec3)).show()
    

    我们需要定义另一个窗口函数,因为在将前一天的累积分数与今天的分数相加时,我们不需要指定行框。

    您可以参考:http://xinhstechblog.blogspot.in/2016/04/spark-window-functions-for-dataframes.html

    【讨论】:

    • 你声明两个 Windowsspecs 有什么原因吗,我看到只有一个被使用。
    • Yes.. 它抛出错误'org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;'因为第一个窗口函数包含 .rowsBetween 子句
    【解决方案2】:

    没有必要使用lag,只需在用户上使用一个窗口分区然后使用sum

    val window = Window.partitionBy("name").orderBy("date").rowsBetween(Long.MinValue, 0)
    genre.withColumn("CumScore", sum($"score").over(window))
    

    使用问题中的输入数据,这将给出:

    +-----+----------+------+-----+--------+
    | name|      date| genre|score|CumScore|
    +-----+----------+------+-----+--------+
    |  Bob|2016-05-01|   art|    0|       0|
    |  Bob|2016-05-02|     0|    1|       1|
    |  Bob|2016-05-03|     0|    0|       1|
    |  Bob|2016-05-04|   art|    0|       1|
    |  Bob|2016-05-05|comedy|    1|       2|
    |  Bob|2016-05-06|action|    0|       2|
    |Alice|2016-05-01|action|    0|       0|
    |Alice|2016-05-02|     0|    1|       1|
    |Alice|2016-05-03|comedy|    0|       1|
    |Alice|2016-05-04|action|    1|       2|
    |Alice|2016-05-05|action|    0|       2|
    |Alice|2016-05-06|horror|    1|       3|
    +-----+----------+------+-----+--------+
    

    在此处使用lag 的问题在于,该列在创建它的同一表达式中使用(该列在withColumn 表达式中使用。即使引用的是先前的值,这也是不允许的。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-09-07
      • 2020-01-26
      • 1970-01-01
      • 2016-05-11
      • 2020-10-20
      • 2012-08-15
      • 1970-01-01
      • 2018-07-14
      相关资源
      最近更新 更多