【问题标题】:Count number of days between dates, ignoring weekends using pyspark计算日期之间的天数,使用 pyspark 忽略周末
【发布时间】:2023-04-09 09:20:02
【问题描述】:

如何使用pyspark 计算两个日期之间的天数(忽略周末)?

这与here 完全相同,只是我需要用pyspark 来做这个。

我尝试使用udf

import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(returnType=IntegerType())
def dateDiffWeekdays(end, start):
    return int(np.busday_count(start, end)) # numpy returns an `numpy.int64` type.

使用这个 udf 时,我收到一条错误消息:

ModuleNotFoundError: 没有名为“numpy”的模块

有谁知道如何解决这个问题?或者更好的是,在本机 pyspark 中没有 udf 的情况下解决这个问题?

编辑:我安装了numpy。在udf 之外它工作得很好。

【问题讨论】:

  • numpy 是一个python依赖,spark默认不可用。在您的本地设置中,您可以执行pip install numpy。在集群设置中,它有点复杂,您还需要在集群上提供 numpy。
  • 我在 Python 中安装了 numpy。我已经用过无数次了。但是如果我理解正确的话,你是说它也需要安装在 spark 集群上?
  • 是的,所有集群机器都需要numpy
  • 您是在本地还是在集群中遇到此错误?

标签: python pyspark


【解决方案1】:

对于 Spark 2.4+,可以在不使用 numpy 或 udf 的情况下获取天数。使用内置的SQL functions 就足够了。

大致按照this answer我们可以

  1. 使用sequence 创建一个包含开始和结束之间所有日期的日期数组
  2. transform 将单日放入一个结构中,保存日期及其星期几值
  3. filter 周六和周日除外
  4. 获取剩余数组的size
#create an array containing all days between begin and end
(df.withColumn('days', F.expr('sequence(begin, end, interval 1 day)'))
#keep only days where day of week (dow) <= 5 (Friday)
.withColumn('weekdays', F.expr('filter(transform(days, day->(day, extract(dow_iso from day))), day -> day.col2 <=5).day')) 
#count how many days are left
.withColumn('no_of_weekdays', F.expr('size(weekdays)')) 
#drop the intermediate columns
.select('begin', 'end', 'no_of_weekdays') 
.show(truncate=False))

输出:

+----------+----------+--------------+
|begin     |end       |no_of_weekdays|
+----------+----------+--------------+
|2020-09-19|2020-09-20|0             |
|2020-09-21|2020-09-24|4             |
|2020-09-21|2020-09-25|5             |
|2020-09-21|2020-09-26|5             |
|2020-09-21|2020-10-02|10            |
|2020-09-19|2020-10-03|10            |
+----------+----------+--------------+

对于 Spark answer 启发的解决方案。

from datetime import timedelta
@F.udf
def dateDiffWeekdays(end, start):
    daygenerator = (start + timedelta(x) for x in range((end - start).days + 1))
    return sum(1 for day in daygenerator if day.isoweekday() <= 5)

df.withColumn("no_of_weekdays", dateDiffWeekdays(df.end, df.begin)).show()

【讨论】:

  • 值得将此答案的第一部分添加到get all the dates between two dates in Spark DataFrame。另请注意,这只适用于 spark 2.4+
  • 这实际上是一个重要的评论,因为我目前正在使用 2.3 版......有没有办法在 spark 2.3 中获得类似的东西?
  • @Willem 恐怕不行。大多数 SQL 函数仅从 2.4 版开始可用。我在答案中添加了一个无 numpy 的 udf 版本,该版本应该适用于 2.3
  • @Hansanho 这是filter 函数的第二个参数。函数的第一个参数是transform(...) 的结果。 transform 返回一个结构数组。每个结构由两列组成:col1(原始日期)和col2(星期几)。过滤函数day -&gt; day.col2 &lt;=5 现在只保留周一到周五的日子。我承认变量day 的命名在这里有点误导,因为这个变量与transform(...) 内部的变量day 无关
  • @nishcs 是的,您跳过第一天的方法对我有用。另一种选择是在第一步之后插入slice.withColumn('days', F.expr('slice(days, 2, size(days))'))。这将删除在 step1 中创建的数组的第一个元素
【解决方案2】:

按照@werner 的方法,我得到了结果,但是在使用内置 DOW_ISO 函数时存在一些差异。

"DAYOFWEEK_ISO",("DOW_ISO") - 基于 ISO 8601 的星期几,日期时间为星期一 (1) 到星期日 (7)(ps:ref

使用 weekday(date) - 返回日期/时间戳的星期几(0 = 星期一,1 = 星期二,...,6 = 星期日)。这符合要求。

F.expr('filter(transform(days, day->(day, weekday(day))), day -> day.col2 <= 4).day')

【讨论】:

  • 请在您的回答中提供更多详细信息。正如目前所写的那样,很难理解您的解决方案。
猜你喜欢
  • 1970-01-01
  • 2020-08-21
  • 2019-04-15
  • 1970-01-01
  • 2021-05-21
  • 1970-01-01
  • 2011-04-17
  • 1970-01-01
相关资源
最近更新 更多