【问题标题】:Spark Dataframe Window lag function based on multiple columns基于多列的 Spark Dataframe Window 滞后函数
【发布时间】:2017-06-08 06:27:32
【问题描述】:
val df = sc.parallelize(Seq((201601, 100.5),
  (201602, 120.6),
  (201603, 450.2),
  (201604, 200.7),
  (201605, 121.4))).toDF("date", "volume")

val w = org.apache.spark.sql.expressions.Window.orderBy("date")    
val leadDf = df.withColumn("new_col", lag("volume", 1, 0).over(w))
leadDf.show()

+------+------+-------+
|  date|volume|new_col|
+------+------+-------+
|201601| 100.5|    0.0|
|201602| 120.6|  100.5|
|201603| 450.2|  120.6|
|201604| 200.7|  450.2|
|201605| 121.4|  200.7|
+------+------+-------+

这工作正常。

但是,如果我还有一列作为领土,如下所示。

val df = sc.parallelize(Seq((201601, ter1, 10.1),
  (201601, ter2, 10.6),
  (201602, ter1, 10.7),
  (201603, ter3, 10.8),
  (201603, ter4, 10.8),
  (201603, ter3, 10.8),
  (201604, ter4, 10.9))).toDF("date", "territory", "volume")

我的要求是同一地区,如果不存在,我想找到上个月的交易量(如果存在),只需分配一个值 0.0

【问题讨论】:

  • 我该怎么做?
  • 我试过这样做..
  • val w = org.apache.spark.sql.expressions.Window.orderBy("date", "territory") val leadDf = df.withColumn("new_col", lag("volume", 1, 0).over(w)) 但不起作用
  • 我只是在 orderBy 子句中包含了领土......没有给出正确的结果
  • 我使用的是 Spark 1.6.2、Scala 2.10

标签: apache-spark apache-spark-sql spark-dataframe window-functions


【解决方案1】:

如果我理解正确,您想要的是同一地区上一个日期的值。

如果是这样,那么只需添加 partitionBy 即重新定义您的窗口规格如下:

val w = org.apache.spark.sql.expressions.Window.partitionBy("territory").orderBy("date")

【讨论】:

    猜你喜欢
    • 2018-02-08
    • 2019-07-31
    • 1970-01-01
    • 1970-01-01
    • 2016-12-04
    • 2020-02-06
    • 2016-05-15
    • 2017-04-30
    • 1970-01-01
    相关资源
    最近更新 更多