【问题标题】:Spark SQL- How to Convert RelationalGroupedDataSet to DataFrameSpark SQL-如何将 RelationalGroupedDataSet 转换为 DataFrame
【发布时间】:2021-02-07 23:07:06
【问题描述】:

嗨,

在我的问题中,我需要对 DataFrame 进行分组,为每个组应用业务逻辑,最后在此基础上发出一个新的 DataFrame。详细描述,有一个device_dataframe,其中包含设备开启(on)和关闭(off)的时间戳。

+---------+----- +--------------------+
|device_id|state |   d_ts             |
+---------+----- +--------------------+
|1        |on    |2020-09-01 16:14:58 |
|1        |off   |2020-09-10 16:14:58 |
|1        |on    |2020-09-19 16:14:58 |
|2        |on    |2020-09-20 16:14:58 |
|2        |off   |2020-10-03 16:14:58 |
|4        |on    |2020-09-20 16:14:58 |
|5        |off   |2020-09-20 16:14:58 |
+---------+-----+-------+-------------+

另一方面,有一个 DataFrame 包含事件信息,包括其时间戳和相应的设备。

+-----+---------+--------------------+
|e_id |device_id|       e_ts         |
+-----+---------+--------------------+
|1    |1        |2020-09-20 16:14:58 |
|2    |2        |2020-10-08 09:19:55 |
|3    |4        |2020-11-01 12:15:37 |
|4    |5        |2020-10-08 01:35:08 |
+-----+---------+-------+------------+

以下是两个DataFrame的join示例:

+---------+-----+--------------------+------+--------------------+
|device_id|e_id |       e_ts         |state |   d_ts             |
+---------+-----+--------------------+------+--------------------+
|1        |1    |2020-09-20 16:14:58 |on    |2020-09-01 16:14:58 |
|1        |1    |2020-09-20 16:14:58 |off   |2020-09-10 16:14:58 |
|1        |1    |2020-09-20 16:14:58 |on    |2020-09-19 16:14:58 |
|2        |2    |2020-10-08 09:19:55 |on    |2020-09-20 16:14:58 |
|2        |2    |2020-10-08 09:19:55 |off   |2020-10-03 16:14:58 |
|4        |3    |2020-11-01 12:15:37 |on    |2020-09-20 16:14:58 |
|5        |4    |2020-10-08 01:35:08 |off   |2020-09-20 16:14:58 |
+---------+-----+-------+--------------------+------+------------+

我最终需要找到的是其对应设备为on时发生的事件信息。例如在上表的情况下,event_id 1 是有效的,因为它发生在 2020-09-20 16:14:58 并且它的设备自 2020-09-19 16:14:58 以来一直是 on,并且 event_id 2 是无效的,因为它的设备是关闭了2020-10-03 16:14:58 并且再也没有打开,依此类推。

Update1:我需要的其他信息是在事件发生之前设备被设置为on 的次数,结果如下表:

+---------+-----+----------+-------------------+
|device_id|e_id | on_count |       e_ts        |
+---------+-----+----------+-------------------+
|1        |1    |    2     |2020-09-20 16:14:58|
|4        |3    |    1     |2020-11-01 12:15:37|
+---------+-----+----------+-------------------+

在上表中,事件 id 1on_count 值为 2,因为当它发生在 2020-09-20 16:14:58 上时,device_id 1 已被打开两次。

我执行以下操作以根据设备对连接表进行分组:

val grouped = eventDF
      .join(deviceDF, "device_id")
      .groupBy("device_id")

导致RelationalGroupedDataSet。现在我需要将逻辑应用于每个组并发出结果 DataFrame 但我没有找到解决方案。我检查了UDAFs,但发现它不适用于我的情况。

我知道如何使用 RDD API 解决这个问题,但我想找到它的 Column API 方法。 任何帮助或建议将不胜感激。

谢谢

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    您可以使用以下逻辑获取每个device_id 的最后一个状态,并过滤最后一个状态为on 的行:

    import org.apache.spark.sql.expressions.Window
    
    val result = eventDF
        .join(deviceDF, "device_id")
        .withColumn(
            "last_state",
            max(when($"d_ts" < $"e_ts", array($"d_ts", $"state"))).over(Window.partitionBy("device_id", "e_id"))(1)
        )
        .withColumn(
            "on_count",
            count(when($"state" === "on" && $"d_ts" < $"e_ts", 1)).over(Window.partitionBy("device_id", "e_id"))
        )
        .filter("last_state = 'on'")
        .select("device_id", "e_id", "on_count", "e_ts")
        .distinct
    
    result.show
    +---------+----+--------+-------------------+
    |device_id|e_id|on_count|               e_ts|
    +---------+----+--------+-------------------+
    |        1|   1|       2|2020-09-20 16:14:58|
    |        4|   3|       1|2020-11-01 12:15:37|
    +---------+----+--------+-------------------+
    

    【讨论】:

    • 不错的方法,谢谢。当每个有效事件发生时,我还需要知道设备打开的次数 (on)。我用所需的输出更新了我的问题,对于事件 1 和 3,on_count 的值分别为 2 和 1(这意味着例如,当事件 1 发生时,设备已设置为 on 两次)
    • 您似乎考虑了device_id 的每个分区中的所有ons,但我们只需要考虑每个分区中event_ts 之前发生的ons。
    • @SoheilPourbafrani 在计数中添加了另一个条件。可以吗?
    • 谢谢,尝试使用其他一些数据似乎when($"d_ts" &lt; $"e_ts" 表达式在这两种情况下都不起作用。例如,event_ts 是 2020-09-28 12:00:00,但它的 last_state 列的聚合数组值是 [2020-09-30 12:00:00, on],这意味着条件 $"d_ts" &lt; $"e_ts" 没有按预期工作,因为 2020-09-30 12:00:00 大于 2020-09-28 12:00:00
    • @SoheilPourbafrani 尝试编辑答案?您也可以按e_id 进行分区。
    猜你喜欢
    • 2017-12-31
    • 2017-03-17
    • 2021-12-23
    • 1970-01-01
    • 2016-05-16
    • 2023-03-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多