如果 spark >= 2.4 则不需要 UDF,请查看下面的示例-
加载输入数据
val df = spark.sql(
"""
|select user_id, user_loans_arr, new_loan
|from values
| ('u1', array(named_struct('loan_date', '2019-01-01', 'loan_amount', 100)), named_struct('loan_date',
| '2020-01-01', 'loan_amount', 100)),
| ('u2', array(named_struct('loan_date', '2020-01-01', 'loan_amount', 200)), named_struct('loan_date',
| '2020-01-01', 'loan_amount', 100))
| T(user_id, user_loans_arr, new_loan)
""".stripMargin)
df.show(false)
df.printSchema()
/**
* +-------+-------------------+-----------------+
* |user_id|user_loans_arr |new_loan |
* +-------+-------------------+-----------------+
* |u1 |[[2019-01-01, 100]]|[2020-01-01, 100]|
* |u2 |[[2020-01-01, 200]]|[2020-01-01, 100]|
* +-------+-------------------+-----------------+
*
* root
* |-- user_id: string (nullable = false)
* |-- user_loans_arr: array (nullable = false)
* | |-- element: struct (containsNull = false)
* | | |-- loan_date: string (nullable = false)
* | | |-- loan_amount: integer (nullable = false)
* |-- new_loan: struct (nullable = false)
* | |-- loan_date: string (nullable = false)
* | |-- loan_amount: integer (nullable = false)
*/
按以下要求处理
user_loans_arr 和 new_loan 作为输入,并将 new_loan 结构添加到现有的 user_loans_arr。然后,从 user_loans_arr 中删除所有 Loan_date 超过 12 个月的元素。
spark >= 2.4
df.withColumn("user_loans_arr",
expr(
"""
|FILTER(array_union(user_loans_arr, array(new_loan)),
| x -> months_between(current_date(), to_date(x.loan_date)) < 12)
""".stripMargin))
.show(false)
/**
* +-------+--------------------------------------+-----------------+
* |user_id|user_loans_arr |new_loan |
* +-------+--------------------------------------+-----------------+
* |u1 |[[2020-01-01, 100]] |[2020-01-01, 100]|
* |u2 |[[2020-01-01, 200], [2020-01-01, 100]]|[2020-01-01, 100]|
* +-------+--------------------------------------+-----------------+
*/
spark < 2.4
// spark < 2.4
val outputSchema = df.schema("user_loans_arr").dataType
import java.time._
val add_and_filter = udf((userLoansArr: mutable.WrappedArray[Row], loan: Row) => {
(userLoansArr :+ loan).filter(row => {
val loanDate = LocalDate.parse(row.getAs[String]("loan_date"))
val period = Period.between(loanDate, LocalDate.now())
period.getYears * 12 + period.getMonths < 12
})
}, outputSchema)
df.withColumn("user_loans_arr", add_and_filter($"user_loans_arr", $"new_loan"))
.show(false)
/**
* +-------+--------------------------------------+-----------------+
* |user_id|user_loans_arr |new_loan |
* +-------+--------------------------------------+-----------------+
* |u1 |[[2020-01-01, 100]] |[2020-01-01, 100]|
* |u2 |[[2020-01-01, 200], [2020-01-01, 100]]|[2020-01-01, 100]|
* +-------+--------------------------------------+-----------------+
*/