虽然groupBy/pivot 方法可以很好地对时间戳进行分组,但它需要非平凡的步骤(很可能是 UDF)来执行必要的过滤,然后再进行重新扩展。这是一种不同的方法,包括以下步骤:
- 仅针对
statusCode“UV”或“OA”过滤数据集
- 对于每一行,使用窗口函数从
previous, current, and next 2 rows 创建一个statusCode 的字符串
- 使用
Regex 模式匹配来识别想要的行
示例代码如下:
import java.sql.Timestamp
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
// Sample data:
// key `A`: requirement #3
// key `B`: requirement #2
// key `C`: requirement #4
val df = Seq(
("A", "OA", Timestamp.valueOf("2019-05-20 00:00:00")),
("A", "E", Timestamp.valueOf("2019-05-30 00:00:00")),
("A", "UV", Timestamp.valueOf("2019-06-22 00:00:00")),
("A", "OA", Timestamp.valueOf("2019-07-01 00:00:00")),
("A", "OA", Timestamp.valueOf("2019-07-03 00:00:00")),
("B", "C", Timestamp.valueOf("2019-06-15 00:00:00")),
("B", "OA", Timestamp.valueOf("2019-06-25 00:00:00")),
("C", "D", Timestamp.valueOf("2019-06-01 00:00:00")),
("C", "OA", Timestamp.valueOf("2019-06-30 00:00:00")),
("C", "UV", Timestamp.valueOf("2019-07-02 00:00:00"))
).toDF("key", "statusCode", "statusTimestamp")
val win = Window.partitionBy("key").orderBy("statusTimestamp")
val df2 = df.
where($"statusCode" === "UV" || $"statusCode" === "OA").
withColumn("statusPrevCurrNext2", concat(
coalesce(lag($"statusCode", 1).over(win), lit("")),
lit("#"),
$"statusCode",
lit("#"),
coalesce(lead($"statusCode", 1).over(win), lit("")),
lit("#"),
coalesce(lead($"statusCode", 2).over(win), lit(""))
))
让我们看看df2(步骤1和2的结果):
df2.show(false)
// +---+----------+-------------------+-------------------+
// |key|statusCode|statusTimestamp |statusPrevCurrNext2|
// +---+----------+-------------------+-------------------+
// |B |OA |2019-06-25 00:00:00|#OA## |
// |C |OA |2019-06-30 00:00:00|#OA#UV# | <-- Req #4: Ends with `#UV#`
// |C |UV |2019-07-02 00:00:00|OA#UV## | <-- Req #4: Ends with `#UV##`
// |A |OA |2019-05-20 00:00:00|#OA#UV#OA |
// |A |UV |2019-06-22 00:00:00|OA#UV#OA#OA | <-- Req #3: Starts with `[^#]*#UV#`
// |A |OA |2019-07-01 00:00:00|UV#OA#OA# | <-- Req #3: starts with `UV#`
// |A |OA |2019-07-03 00:00:00|OA#OA## |
// +---+----------+-------------------+-------------------+
申请步骤3:
df2.
where($"statusPrevCurrNext2".rlike("^[^#]*#UV#.*|^UV#.*|.*#UV#+$")).
orderBy("key", "statusTimestamp").
show(false)
// +---+----------+-------------------+-------------------+
// |key|statusCode|statusTimestamp |statusPrevCurrNext2|
// +---+----------+-------------------+-------------------+
// |A |UV |2019-06-22 00:00:00|OA#UV#OA#OA |
// |A |OA |2019-07-01 00:00:00|UV#OA#OA# |
// |C |OA |2019-06-30 00:00:00|#OA#UV# |
// |C |UV |2019-07-02 00:00:00|OA#UV## |
// +---+----------+-------------------+-------------------+