【问题标题】:Spark Window Functions - rangeBetween datesSpark 窗口函数 - rangeBetween 日期
【发布时间】:2016-01-17 08:55:44
【问题描述】:

我有一个带有数据的 Spark SQL DataFrame,我想要获取的是给定日期范围内当前行之前的所有行。因此,例如,我希望在给定行之前的 7 天内获得所有行。我发现我需要使用 Window Function 之类的:

Window \
    .partitionBy('id') \
    .orderBy('start')

问题来了。我想要一个rangeBetween 7 天,但我在 Spark 文档中找不到任何内容。 Spark 甚至提供这样的选项吗?现在我只是得到所有前面的行:

.rowsBetween(-sys.maxsize, 0)

但想达到以下目标:

.rangeBetween("7 days", 0)

如果有人能在这方面帮助我,我将不胜感激。提前致谢!

【问题讨论】:

    标签: sql apache-spark pyspark apache-spark-sql window-functions


    【解决方案1】:

    绝妙的解决方案@zero323,如果你想用几分钟而不是我必须的几天来操作,并且你不需要用 id 进行分区,所以你只需要修改一个简单的部分我展示的代码:

    df.createOrReplaceTempView("df")
    spark.sql(
        """SELECT *, sum(total) OVER (
            ORDER BY CAST(reading_date AS timestamp) 
            RANGE BETWEEN INTERVAL 45 minutes PRECEDING AND CURRENT ROW
         ) AS sum_total FROM df""").show()
    

    【讨论】:

      【解决方案2】:

      火花 >= 2.3

      从 Spark 2.3 开始,可以使用 SQL API 使用区间对象,但 DataFrame API 支持是 still work in progress

      df.createOrReplaceTempView("df")
      
      spark.sql(
          """SELECT *, mean(some_value) OVER (
              PARTITION BY id 
              ORDER BY CAST(start AS timestamp) 
              RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
           ) AS mean FROM df""").show()
      
      ## +---+----------+----------+------------------+       
      ## | id|     start|some_value|              mean|
      ## +---+----------+----------+------------------+
      ## |  1|2015-01-01|      20.0|              20.0|
      ## |  1|2015-01-06|      10.0|              15.0|
      ## |  1|2015-01-07|      25.0|18.333333333333332|
      ## |  1|2015-01-12|      30.0|21.666666666666668|
      ## |  2|2015-01-01|       5.0|               5.0|
      ## |  2|2015-01-03|      30.0|              17.5|
      ## |  2|2015-02-01|      20.0|              20.0|
      ## +---+----------+----------+------------------+
      

      火花

      据我所知,在 Spark 和 Hive 中都不能直接使用。两者都要求与RANGE 一起使用的ORDER BY 子句为数字。我发现最接近的是转换为时间戳并以秒为单位运行。假设start 列包含date 类型:

      from pyspark.sql import Row
      
      row = Row("id", "start", "some_value")
      df = sc.parallelize([
          row(1, "2015-01-01", 20.0),
          row(1, "2015-01-06", 10.0),
          row(1, "2015-01-07", 25.0),
          row(1, "2015-01-12", 30.0),
          row(2, "2015-01-01", 5.0),
          row(2, "2015-01-03", 30.0),
          row(2, "2015-02-01", 20.0)
      ]).toDF().withColumn("start", col("start").cast("date"))
      

      一个小助手和窗口定义:

      from pyspark.sql.window import Window
      from pyspark.sql.functions import mean, col
      
      
      # Hive timestamp is interpreted as UNIX timestamp in seconds*
      days = lambda i: i * 86400 
      

      最后查询:

      w = (Window()
         .partitionBy(col("id"))
         .orderBy(col("start").cast("timestamp").cast("long"))
         .rangeBetween(-days(7), 0))
      
      df.select(col("*"), mean("some_value").over(w).alias("mean")).show()
      
      ## +---+----------+----------+------------------+
      ## | id|     start|some_value|              mean|
      ## +---+----------+----------+------------------+
      ## |  1|2015-01-01|      20.0|              20.0|
      ## |  1|2015-01-06|      10.0|              15.0|
      ## |  1|2015-01-07|      25.0|18.333333333333332|
      ## |  1|2015-01-12|      30.0|21.666666666666668|
      ## |  2|2015-01-01|       5.0|               5.0|
      ## |  2|2015-01-03|      30.0|              17.5|
      ## |  2|2015-02-01|      20.0|              20.0|
      ## +---+----------+----------+------------------+
      

      远非漂亮,但有效。


      * Hive Language Manual, Types

      【讨论】:

      • 我使用 Spark 2.3,但第一个选项对我不起作用并抛出异常 scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$) 有一个 JIRA 问题将在 2.4.0 中修复:issues.apache.org/jira/browse/SPARK-25845
      • 您好,对于最后一个查询,请问您如何包含“天数”?我得到“名称'天'未定义”。
      • @Spacez "days" 辅助函数在上面声明为一个 lambda 函数,它将参数乘以 86400(一天以秒为单位)。
      • Window.partitionBy(col("id"), pyspark.sql.functions.window("start", "1 day"))
      • @zero323,你想解释一下窗口函数中的乳清,你添加 cast('timestamp').cast('long'),cast('long') 是必须的吗?谢谢。
      猜你喜欢
      • 1970-01-01
      • 2018-07-19
      • 1970-01-01
      • 2022-11-25
      • 1970-01-01
      • 1970-01-01
      • 2019-10-15
      相关资源
      最近更新 更多