【发布时间】:2021-02-07 23:07:06
【问题描述】:
嗨,
在我的问题中,我需要对 DataFrame 进行分组,为每个组应用业务逻辑,最后在此基础上发出一个新的 DataFrame。详细描述,有一个device_dataframe,其中包含设备开启(on)和关闭(off)的时间戳。
+---------+----- +--------------------+
|device_id|state | d_ts |
+---------+----- +--------------------+
|1 |on |2020-09-01 16:14:58 |
|1 |off |2020-09-10 16:14:58 |
|1 |on |2020-09-19 16:14:58 |
|2 |on |2020-09-20 16:14:58 |
|2 |off |2020-10-03 16:14:58 |
|4 |on |2020-09-20 16:14:58 |
|5 |off |2020-09-20 16:14:58 |
+---------+-----+-------+-------------+
另一方面,有一个 DataFrame 包含事件信息,包括其时间戳和相应的设备。
+-----+---------+--------------------+
|e_id |device_id| e_ts |
+-----+---------+--------------------+
|1 |1 |2020-09-20 16:14:58 |
|2 |2 |2020-10-08 09:19:55 |
|3 |4 |2020-11-01 12:15:37 |
|4 |5 |2020-10-08 01:35:08 |
+-----+---------+-------+------------+
以下是两个DataFrame的join示例:
+---------+-----+--------------------+------+--------------------+
|device_id|e_id | e_ts |state | d_ts |
+---------+-----+--------------------+------+--------------------+
|1 |1 |2020-09-20 16:14:58 |on |2020-09-01 16:14:58 |
|1 |1 |2020-09-20 16:14:58 |off |2020-09-10 16:14:58 |
|1 |1 |2020-09-20 16:14:58 |on |2020-09-19 16:14:58 |
|2 |2 |2020-10-08 09:19:55 |on |2020-09-20 16:14:58 |
|2 |2 |2020-10-08 09:19:55 |off |2020-10-03 16:14:58 |
|4 |3 |2020-11-01 12:15:37 |on |2020-09-20 16:14:58 |
|5 |4 |2020-10-08 01:35:08 |off |2020-09-20 16:14:58 |
+---------+-----+-------+--------------------+------+------------+
我最终需要找到的是其对应设备为on时发生的事件信息。例如在上表的情况下,event_id 1 是有效的,因为它发生在 2020-09-20 16:14:58 并且它的设备自 2020-09-19 16:14:58 以来一直是 on,并且 event_id 2 是无效的,因为它的设备是关闭了2020-10-03 16:14:58 并且再也没有打开,依此类推。
Update1:我需要的其他信息是在事件发生之前设备被设置为on 的次数,结果如下表:
+---------+-----+----------+-------------------+
|device_id|e_id | on_count | e_ts |
+---------+-----+----------+-------------------+
|1 |1 | 2 |2020-09-20 16:14:58|
|4 |3 | 1 |2020-11-01 12:15:37|
+---------+-----+----------+-------------------+
在上表中,事件 id 1 的 on_count 值为 2,因为当它发生在 2020-09-20 16:14:58 上时,device_id 1 已被打开两次。
我执行以下操作以根据设备对连接表进行分组:
val grouped = eventDF
.join(deviceDF, "device_id")
.groupBy("device_id")
导致RelationalGroupedDataSet。现在我需要将逻辑应用于每个组并发出结果 DataFrame 但我没有找到解决方案。我检查了UDAFs,但发现它不适用于我的情况。
我知道如何使用 RDD API 解决这个问题,但我想找到它的 Column API 方法。 任何帮助或建议将不胜感激。
谢谢
【问题讨论】:
标签: scala apache-spark apache-spark-sql