【问题标题】:How to group by time interval in Spark SQL如何在 Spark SQL 中按时间间隔分组
【发布时间】:2016-10-04 13:47:02
【问题描述】:

我的数据集如下所示:

KEY |Event_Type | metric | Time 
001 |event1     | 10     | 2016-05-01 10:50:51
002 |event2     | 100    | 2016-05-01 10:50:53
001 |event3     | 20     | 2016-05-01 10:50:55
001 |event1     | 15     | 2016-05-01 10:51:50
003 |event1     | 13     | 2016-05-01 10:55:30
001 |event2     | 12     | 2016-05-01 10:57:00
001 |event3     | 11     | 2016-05-01 11:00:01

我想得到所有验证这个的密钥:

“特定事件的指标总和” > 阈值 5 分钟

在我看来,这是使用 滑动窗口功能 的完美选择。

如何使用 Spark SQL 做到这一点?

谢谢。

【问题讨论】:

    标签: sql apache-spark apache-spark-sql


    【解决方案1】:

    对于静态边界,您可以执行以下操作:

    1) 变换(地图、mapPartitions 等)形成 YYYY-MM-DD-hh-mm 的时间值,其中 mm 以 5 分钟级别汇总。例如01、02、03、05变成05; 16,17,18,19,20 变成 20

    2) 使用 event_type 和 time 执行 groupBy 或 reduceBy 并对指标执行聚合(Sum)

    3) 执行过滤器转换以过滤指标 > 5

    您可以用几乎相同的方式在 spark rdd 或 dataframe(sql) 中编写上述内容。

    对于 00-05、01-06、02-07 等其他类型的边界,您应该尝试研究 滑动窗口 的概念。如果您的数据摄取用例适合流模式,那么 Spark Streaming API 将是完美的,否则您可以找到像这样的自定义解决方案: Apache Spark - Dealing with Sliding Windows on Temporal RDDs

    【讨论】:

      【解决方案2】:

      火花 >= 2.0

      您可以使用window(不要误认为是窗口函数)。根据变体,它将时间戳分配给另一个可能重叠的存储桶:

      df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")
      
      // +---+---------------------------------------------+-----------+
      // |KEY|window                                       |sum(metric)|
      // +---+---------------------------------------------+-----------+
      // |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45         |
      // |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12         |
      // |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13         |
      // |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11         |
      // |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100        |
      // +---+---------------------------------------------+-----------+
      

      火花

      让我们从示例数据开始:

      import spark.implicits._  // import sqlContext.implicits._ in Spark < 2.0
      
      val df = Seq(
        ("001", "event1", 10, "2016-05-01 10:50:51"),
        ("002", "event2", 100, "2016-05-01 10:50:53"),
        ("001", "event3", 20, "2016-05-01 10:50:55"),
        ("001", "event1", 15, "2016-05-01 10:51:50"),
        ("003", "event1", 13, "2016-05-01 10:55:30"),
        ("001", "event2", 12, "2016-05-01 10:57:00"),
        ("001", "event3", 11, "2016-05-01 11:00:01")
      ).toDF("KEY", "Event_Type", "metric", "Time")
      

      我假设该事件由KEY 标识。如果不是这种情况,您可以根据您的要求调整 GROUP BY / PARTITION BY 子句。

      如果您对具有独立于数据的静态窗口的聚合感兴趣,请将时间戳转换为数字数据类型并进行舍入

      import org.apache.spark.sql.functions.{round, sum}
      
      // cast string to timestamp_seconds
      val ts = $"Time".cast("timestamp").cast("long")
      
      // Round to 300 seconds interval
      // In Spark >= 3.1 replace cast("timestamp") with 
      val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval")
      
      df.groupBy($"KEY", interval).sum("metric")
      
      // +---+---------------------+-----------+
      // |KEY|interval             |sum(metric)|
      // +---+---------------------+-----------+
      // |001|2016-05-01 11:00:00.0|11         |
      // |001|2016-05-01 10:55:00.0|12         |
      // |001|2016-05-01 10:50:00.0|45         |
      // |003|2016-05-01 10:55:00.0|13         |
      // |002|2016-05-01 10:50:00.0|100        |
      // +---+---------------------+-----------+
      

      如果您对与当前行相关的窗口感兴趣,请使用窗口函数:

      import org.apache.spark.sql.expressions.Window
      
      // Partition by KEY
      // Order by timestamp 
      // Consider window of -150 seconds to + 150 seconds relative to the current row
      val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
      df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))
      
      // +---+----------+------+-------------------+----------+----------+
      // |KEY|Event_Type|metric|Time               |ts        |window_sum|
      // +---+----------+------+-------------------+----------+----------+
      // |003|event1    |13    |2016-05-01 10:55:30|1462092930|13        |
      // |001|event1    |10    |2016-05-01 10:50:51|1462092651|45        |
      // |001|event3    |20    |2016-05-01 10:50:55|1462092655|45        |
      // |001|event1    |15    |2016-05-01 10:51:50|1462092710|45        |
      // |001|event2    |12    |2016-05-01 10:57:00|1462093020|12        |
      // |001|event3    |11    |2016-05-01 11:00:01|1462093201|11        |
      // |002|event2    |100   |2016-05-01 10:50:53|1462092653|100       |
      // +---+----------+------+-------------------+----------+----------+
      

      出于性能原因,仅当数据可以划分为多个单独的组时,此方法才有用。在 Spark HiveContext 才能使其工作。

      【讨论】:

      • 您好,我正在使用 Java 如何在 java 和 spark 2.1.0 中执行相同的操作
      • @sathiyarajan 应该几乎相同,除了细微的语法差异。
      • 您好,那么组双月、季度、6 月的方法是什么?
      猜你喜欢
      • 2016-09-21
      • 1970-01-01
      • 2021-01-03
      • 2022-01-14
      • 2011-12-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-02-08
      相关资源
      最近更新 更多