【发布时间】:2021-10-08 20:58:11
【问题描述】:
考虑到一周从周日开始到周六结束,我需要从给定日期获取周开始日期和周结束日期。
我提到了这个post,但这需要星期一作为一周的开始日期。 spark中是否有任何内置函数可以解决这个问题?
【问题讨论】:
考虑到一周从周日开始到周六结束,我需要从给定日期获取周开始日期和周结束日期。
我提到了这个post,但这需要星期一作为一周的开始日期。 spark中是否有任何内置函数可以解决这个问题?
【问题讨论】:
找出星期几并使用 selectExpr 遍历列,并将星期日设为星期开始日期
from pyspark.sql import functions as F
df_b = spark.createDataFrame([('1','2020-07-13')],[ "ID","date"])
df_b = df_b.withColumn('day_of_week', F.dayofweek(F.col('date')))
df_b = df_b.selectExpr('*', 'date_sub(date, day_of_week-1) as week_start')
df_b = df_b.selectExpr('*', 'date_add(date, 7-day_of_week) as week_end')
df_b.show()
+---+----------+-----------+----------+----------+
| ID| date|day_of_week|week_start| week_end|
+---+----------+-----------+----------+----------+
| 1|2020-07-13| 2|2020-07-12|2020-07-18|
+---+----------+-----------+----------+----------+
Spark SQL 更新
首先从数据框创建一个临时视图
df_a.createOrReplaceTempView("df_a_sql")
代码在这里
%sql
select *, date_sub(date,dayofweek-1) as week_start,
date_sub(date, 7-dayofweek) as week_end
from
(select *, dayofweek(date) as dayofweek
from df_a_sql) T
输出
+---+----------+-----------+----------+----------+
| ID| date|day_of_week|week_start| week_end|
+---+----------+-----------+----------+----------+
| 1|2020-07-13| 2|2020-07-12|2020-07-18|
+---+----------+-----------+----------+----------+
【讨论】:
也许这有帮助 -
val df = spark.sql("select cast('2020-07-12' as date) as date")
df.show(false)
df.printSchema()
/**
* +----------+
* |date |
* +----------+
* |2020-07-15|
* +----------+
*
* root
* |-- date: date (nullable = true)
*/
// week starting from SUNDAY and ending SATURDAY
df.withColumn("week_end", next_day($"date", "SAT"))
.withColumn("week_start", date_sub($"week_end", 6))
.show(false)
/**
* +----------+----------+----------+
* |date |week_end |week_start|
* +----------+----------+----------+
* |2020-07-12|2020-07-18|2020-07-12|
* +----------+----------+----------+
*/
// week starting from MONDAY and ending SUNDAY
df.withColumn("week_end", next_day($"date", "SUN"))
.withColumn("week_start", date_sub($"week_end", 6))
.show(false)
/**
* +----------+----------+----------+
* |date |week_end |week_start|
* +----------+----------+----------+
* |2020-07-12|2020-07-19|2020-07-13|
* +----------+----------+----------+
*/
// week starting from TUESDAY and ending MONDAY
df.withColumn("week_end", next_day($"date", "MON"))
.withColumn("week_start", date_sub($"week_end", 6))
.show(false)
/**
* +----------+----------+----------+
* |date |week_end |week_start|
* +----------+----------+----------+
* |2020-07-12|2020-07-13|2020-07-07|
* +----------+----------+----------+
*/
【讨论】:
DAY开始的其他周使用
在 pyspark 数据框中找出一周的开始日期和结束日期。星期一是一周的第一天。
def add_start_end_week(dataframe, timestamp_col, StartDate, EndDate):
""""
Function:
Get the start date and the end date of week
args
dataframe: spark dataframe
column_name: timestamp column based on which we have to calculate the start date and end date
StartDate: start date column name of week
EndDate: end date column name of week
"""
dataframe = dataframe.withColumn(
'day_of_week', dayofweek(col(timestamp_col)))
# start of the week (Monday as first day)
dataframe = dataframe.withColumn('StartDate',when(col("day_of_week")>1, \
expr("date_add(date_sub({},day_of_week-1),1)".format(timestamp_col))). \
otherwise(expr("date_sub({},6)".format(timestamp_col))))
#End of the Week
dataframe = dataframe.withColumn('EndDate',when(col("day_of_week")>1, \
expr("date_add(date_add({},7-day_of_week),1)".format(timestamp_col))). \
otherwise(col("{}".format(timestamp_col))))
return dataframe
验证上述函数:
df = spark.createDataFrame([('2021-09-26',),('2021-09-25',),('2021-09-24',),('2021-09-23',),('2021-09-22',),('2021-09-21',),('2021-09-20',)], ['dt'])
dataframe = df.withColumn('day_of_week', dayofweek(col('dt')))
# start of the week (Monday as first day)
dataframe = dataframe.withColumn('StartDate',when(col("day_of_week")>1,expr("date_add(date_sub(dt,day_of_week-1),1)")).otherwise(expr("date_sub(dt,6)")))
#End of the Week
dataframe = dataframe.withColumn('EndDate',when(col("day_of_week")>1,expr("date_add(date_add(dt,7-day_of_week),1)")).otherwise(col("dt")))
【讨论】: