【问题标题】:How do I calculate the overlap days in a dataframe in PySpark?如何计算 PySpark 中数据框中的重叠天数?
【发布时间】:2020-05-29 16:45:27
【问题描述】:

我需要在数据框中按行计算重叠天数。 数据如下所示:

+-------+-------------------+-------------------+------------------+
|id|             begin|                end|              days|
+-------+-------------------+-------------------+------------------+
|1|2019-01-01 00:00:00|2019-01-08 02:10:00| 7.090277777777778|
|1|2019-02-04 05:28:00|2019-03-05 19:29:00|29.584027777777777|
|1|2019-06-05 22:18:00|2020-01-01 00:00:00|209.07083333333333|
|1|2019-05-17 16:25:00|2019-06-05 22:18:00| 19.24513888888889|
|1|2019-05-03 05:05:00|2019-05-17 16:25:00|14.472222222222221|
|1|2019-01-08 02:10:00|2019-02-04 05:28:00|           27.1375|
|1|2019-01-01 00:00:00|2020-01-01 00:00:00|             365.0|
|1|2019-04-22 18:45:00|2019-05-03 05:05:00|10.430555555555555|
|1|2019-03-05 19:29:00|2019-04-22 18:45:00| 47.96944444444444|
+-------+-------------------+-------------------+------------------+

在这里,第一个条目跨越 2019 年(365 天)。 所有其他条目都与第一个条目重叠。 我想要一个函数来计算总天数,即删除重叠天数后数据集中的 365 天。

我实际上在 R 中解决了这个问题,但我无法在 PySpark 中运行 for 循环。

我正在寻找这样的输出。

+-------+-------------------+-------------------+------------------+------------------+
|     id|              begin|                end|              days|           overlap|
+-------+-------------------+-------------------+------------------+------------------+
|1      |2019-01-01 00:00:00|2020-01-01 00:00:00|             365.0|            0|
|1      |2019-01-01 00:00:00|2019-01-08 02:10:00| 7.090277777777778| 7.090277777777778|
|1      |2019-01-08 02:10:00|2019-02-04 05:28:00|           27.1375|           27.1375|
|1      |2019-02-04 05:28:00|2019-03-05 19:29:00|29.584027777777777|29.584027777777777|
|1      |2019-03-05 19:29:00|2019-04-22 18:45:00| 47.96944444444444| 47.96944444444444|
|1      |2019-04-22 18:45:00|2019-05-03 05:05:00|10.430555555555555|10.430555555555555|
|1      |2019-05-03 05:05:00|2019-05-17 16:25:00|14.472222222222221|14.472222222222221|
|1      |2019-05-17 16:25:00|2019-06-05 22:18:00| 19.24513888888889| 19.24513888888889|
|1      |2019-06-05 22:18:00|2020-01-01 00:00:00|209.07083333333333|209.07083333333333|
+-------+-------------------+-------------------+------------------+------------------+

日期从不按顺序排列,并且有些情况没有重叠。

场景 2:没有重叠

+-------+-------------------+-------------------+-----+-----+
|  id   |              begin|                end| days| over|
+-------+-------------------+-------------------+-----+-----+
|2      |2019-01-01 00:00:00|2019-12-25 00:00:00|358.0|    0|
|2      |2019-12-25 00:00:00|2020-01-01 00:00:00|  7.0|    0|
+-------+-------------------+-------------------+-----+-----+

场景 3:部分重叠

+-------+-------------------+-------------------+-----+-----+
|     id|              begin|                end| days| over|
+-------+-------------------+-------------------+-----+-----+
|3      |2019-01-01 00:00:00|2019-12-25 00:00:00|358.0|    0|
|3      |2019-12-20 00:00:00|2020-01-01 00:00:00| 12.0|    5|
+-------+-------------------+-------------------+-----+-----+

场景 4:更复杂 这里第一个条目跨越 2019 年的前 358 天。第二个条目与第一个条目完全重叠,因此所有的日子都结束了。第三个条目与第二个条目不重叠,但与第一个条目重叠 5 天,因此在“结束”列下为 5 天。

+-------+-------------------+-------------------+-----+-----+
|     id|              begin|                end| days| over|
+-------+-------------------+-------------------+-----+-----+
|4      |2019-01-01 00:00:00|2019-12-25 00:00:00|358.0|    0|
|4      |2019-01-01 00:00:00|2019-11-25 00:00:00|328.0|328.0|
|4      |2019-12-20 00:00:00|2020-01-01 00:00:00| 12.0|    5|
+-------+-------------------+-------------------+-----+-----+

基本上,我想知道特定 ID 的有效期。我不能只取最大和最小日期并减去它们,因为期间可能会有中断。

在 R 中,我创建了另一个名为“overlap”的列,并在 for 循环中使用 Overlap 函数来检查所有值与其他值。

产生所需输出的R函数:

abc<-data.frame()
for (i in id) {
  xyz<- dataset %>% filter(id==i) %>% arrange(begin)

  for(j in 1:(nrow(xyz)-1)){
    k=j
    while(k<nrow(xyz)){
      xyz$overlap[j]<- xyz$overlap[j] + Overlap(c(xyz$begin[j], xyz$end[j]), c(xyz$begin[k+1], xyz$end[k+1])) 
      k=k+1
    }

  }
  abc<- bind_rows(abc,xyz)
}

我还在学习 pyspark,需要这方面的帮助。

@murtihash 对代码 sn-p 的响应

嗨, 它看起来更接近答案,但仍然不是我正在寻找的结果。 代码输出

+-------+-------------------+-------------------+-----------------+-------+
|     id|              begin|                end|             days|overlap|
+-------+-------------------+-------------------+-----------------+-------+
|7777777|2019-01-05 01:00:00|2019-04-04 00:00:00|88.95833333333333|      0|
|7777777|2019-04-04 00:00:00|2019-07-11 00:00:00|             98.0|      0|
|7777777|2019-07-11 00:00:00|2019-09-17 00:00:00|             68.0|      1|
|7777777|2019-09-17 00:00:00|2019-09-19 22:01:00|2.917361111111111|      0|
|7777777|2019-09-19 22:01:00|2020-01-01 00:00:00|103.0826388888889|     -1|
|7777777|2019-09-19 22:01:00|2020-01-01 00:00:00|103.0826388888889|     -1|
+-------+-------------------+-------------------+-----------------+-------+

期望的输出应该是:

+-------+-------------------+-------------------+-----------------+-------+
|     id|              begin|                end|             days|overlap|
+-------+-------------------+-------------------+-----------------+-------+
|7777777|2019-01-05 01:00:00|2019-04-04 00:00:00|88.95833333333333|      0|
|7777777|2019-04-04 00:00:00|2019-07-11 00:00:00|             98.0|      0|
|7777777|2019-07-11 00:00:00|2019-09-17 00:00:00|             68.0|      0|
|7777777|2019-09-17 00:00:00|2019-09-19 22:01:00|2.917361111111111|      0|
|7777777|2019-09-19 22:01:00|2020-01-01 00:00:00|103.0826388888889|103.082|
|7777777|2019-09-19 22:01:00|2020-01-01 00:00:00|103.0826388888889|      0|
+-------+-------------------+-------------------+-----------------+-------+

解释:前四行没有重叠。第 5 行和第 6 行是完全相同的时期(并且不与其他行重叠),因此对于第 5 行或第 6 行之一,重叠应该是 103.08 天

更新: 无法在这种特定情况下工作。 代码 sn -p @murtihash 的输出

+-------+-------------------+-------------------+------------------+-------+
|  imono|              begin|                end|              days|overlap|
+-------+-------------------+-------------------+------------------+-------+
|9347774|2019-01-01 00:00:00|2019-01-08 02:10:00| 7.090277777777778|    0.0|
|9347774|2019-01-08 02:10:00|2019-02-04 05:28:00|           27.1375|    0.0|
|9347774|2019-02-04 05:28:00|2019-03-05 19:29:00|29.584027777777777|    0.0|
|9347774|2019-03-05 19:29:00|2019-04-22 18:45:00| 47.96944444444444|    0.0|
|9347774|2019-04-22 18:45:00|2019-05-03 05:05:00|10.430555555555555|    0.0|
|9347774|2019-05-03 05:05:00|2019-05-17 16:25:00|14.472222222222221|    0.0|
|9347774|2019-05-17 16:25:00|2019-06-05 22:18:00| 19.24513888888889|    0.0|
|9347774|2019-01-01 00:00:00|2020-01-01 00:00:00|             365.0|    7.0|
|9347774|2019-06-05 22:18:00|2020-01-01 00:00:00|209.07083333333333|    0.0|
+-------+-------------------+-------------------+------------------+-------+

期望的输出: 这个

 +-------+-------------------+-------------------+------------------+-------+
    |  imono|              begin|                end|              days|overlap|
    +-------+-------------------+-------------------+------------------+-------+
    |9347774|2019-01-01 00:00:00|2019-01-08 02:10:00| 7.090277777777778|    0.0|
    |9347774|2019-01-08 02:10:00|2019-02-04 05:28:00|           27.1375|    0.0|
    |9347774|2019-02-04 05:28:00|2019-03-05 19:29:00|29.584027777777777|    0.0|
    |9347774|2019-03-05 19:29:00|2019-04-22 18:45:00| 47.96944444444444|    0.0|
    |9347774|2019-04-22 18:45:00|2019-05-03 05:05:00|10.430555555555555|    0.0|
    |9347774|2019-05-03 05:05:00|2019-05-17 16:25:00|14.472222222222221|    0.0|
    |9347774|2019-05-17 16:25:00|2019-06-05 22:18:00| 19.24513888888889|    0.0|
    |9347774|2019-01-01 00:00:00|2020-01-01 00:00:00|             365.0|    365|
    |9347774|2019-06-05 22:18:00|2020-01-01 00:00:00|209.07083333333333|    0.0|
    +-------+-------------------+-------------------+------------------+-------+

或者

 +-------+-------------------+-------------------+------------------+-------+
|  imono|              begin|                end|              days|overlap|

+-------+-------------------+-------------------+------------------+-------+
|9347774|2019-01-01 00:00:00|2019-01-08 02:10:00| 7.090277777777778|    7.1|
|9347774|2019-01-08 02:10:00|2019-02-04 05:28:00|           27.1375|   27.1|
|9347774|2019-02-04 05:28:00|2019-03-05 19:29:00|29.584027777777777|   29.5|
|9347774|2019-03-05 19:29:00|2019-04-22 18:45:00| 47.96944444444444|   48.0|
|9347774|2019-04-22 18:45:00|2019-05-03 05:05:00|10.430555555555555|   10.4|
|9347774|2019-05-03 05:05:00|2019-05-17 16:25:00|14.472222222222221|   14.5|
|9347774|2019-05-17 16:25:00|2019-06-05 22:18:00| 19.24513888888889|   19.2|
|9347774|2019-01-01 00:00:00|2020-01-01 00:00:00|             365.0|    0.0|
|9347774|2019-06-05 22:18:00|2020-01-01 00:00:00|209.07083333333333|  209.1|
+-------+-------------------+-------------------+------------------+-------+

解释:倒数第二个条目跨越全年,所有其他条目都与之重叠。所以输出要么是倒数第二个条目重叠 = 365,要么是所有其他条目的重叠天数和倒数第二个条目的重叠天数为 0。

更新2: 无法在这种特定情况下工作。 代码 sn -p @murtihash (Update2) 的输出

+-------+-------------------+-------------------+------------------+-------+
|  imono|              begin|                end|              days|overlap|
+-------+-------------------+-------------------+------------------+-------+
|9395123|2019-01-19 05:01:00|2019-02-06 00:00:00|17.790972222222223|   17.0|
|9395123|2019-02-06 00:00:00|2019-06-17 00:00:00|             131.0|    0.0|
|9395123|2019-01-19 05:01:00|2020-01-01 00:00:00| 346.7909722222222|    0.0|
|9395123|2019-06-17 00:00:00|2020-01-01 00:00:00|             198.0|    0.0|
+-------+-------------------+-------------------+------------------+-------+

期望的输出:

+-------+-------------------+-------------------+------------------+-------+
|  id   |              begin|                end|              days|overlap|
+-------+-------------------+-------------------+------------------+-------+
|8888888|2019-01-19 05:01:00|2019-02-06 00:00:00|17.790972222222223|   17.8|
|8888888|2019-02-06 00:00:00|2019-06-17 00:00:00|             131.0|    0.0|
|8888888|2019-01-19 05:01:00|2020-01-01 00:00:00| 346.7909722222222|    329|
|8888888|2019-06-17 00:00:00|2020-01-01 00:00:00|             198.0|    0.0|
+-------+-------------------+-------------------+------------------+-------+

我真的不明白你的代码 sn-p 做了什么,因此我无法为我的目的对其进行调整。感谢您的帮助!

【问题讨论】:

  • 场景 4 不清楚。第三行与第一行进行比较以获得重叠 5,因为第一行和第二行的日期相同?
  • 嗨@murtihash,请再次检查场景4。我已经添加了 cmets 来解释它
  • 同一id的一行可以和多行重叠吗?你的 spark 版本是什么?
  • 是的,在某些情况下,一行确实与多行重叠,我也需要捕获它。这是我在数据块上使用的:附加到集群:xxx,DBR 6.5 |火花 2.4.5 |斯卡拉 2.11
  • 这就是我的想法,检查我的解决方案,如果它满足所有情况,请告诉我..

标签: apache-spark pyspark


【解决方案1】:

对于Spark2.4+,您可以使用sequence生成日期范围),collect_list, 并结合使用数组函数和高阶函数来获得所需的重叠。

df.show() #sample dataframe
#+---+-------------------+-------------------+-----+
#| id|              begin|                end| days|
#+---+-------------------+-------------------+-----+
#|  2|2019-01-01 00:00:00|2019-12-25 00:00:00|358.0|
#|  2|2019-12-25 00:00:00|2020-01-01 00:00:00|  7.0|
#|  3|2019-01-01 00:00:00|2019-12-25 00:00:00|358.0|
#|  3|2019-12-20 00:00:00|2020-01-01 00:00:00| 12.0|
#|  4|2019-01-01 00:00:00|2019-12-25 00:00:00|358.0|
#|  4|2019-01-01 00:00:00|2019-11-25 00:00:00|328.0|
#|  4|2019-12-20 00:00:00|2020-01-01 00:00:00| 12.0|
#+---+-------------------+-------------------+-----+

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w1=Window().partitionBy("id").orderBy("begin")
df.withColumn("seq", F.expr("""sequence(to_timestamp(begin), to_timestamp(end),interval 1 day)"""))\
  .withColumn("seq1", F.expr("""flatten(filter(collect_list(seq) over\
                                (partition by id),x-> arrays_overlap(x,seq)==True and seq!=x))"""))\
  .withColumn("overlap", F.when(F.row_number().over(w1)==1, F.lit(0))\
              .otherwise(F.size(F.array_intersect("seq","seq1"))-1)).orderBy("id","end").drop("seq","seq1").show()

#+---+-------------------+-------------------+-----+-------+
#| id|              begin|                end| days|overlap|
#+---+-------------------+-------------------+-----+-------+
#|  2|2019-01-01 00:00:00|2019-12-25 00:00:00|358.0|      0|
#|  2|2019-12-25 00:00:00|2020-01-01 00:00:00|  7.0|      0|
#|  3|2019-01-01 00:00:00|2019-12-25 00:00:00|358.0|      0|
#|  3|2019-12-20 00:00:00|2020-01-01 00:00:00| 12.0|      5|
#|  4|2019-01-01 00:00:00|2019-11-25 00:00:00|328.0|    328|
#|  4|2019-01-01 00:00:00|2019-12-25 00:00:00|358.0|      0|
#|  4|2019-12-20 00:00:00|2020-01-01 00:00:00| 12.0|      5|
#+---+-------------------+-------------------+-----+-------+

UPDATE

这应该涵盖所有情况:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w1=Window().partitionBy("id").orderBy("begin")
w2=Window().partitionBy("id","begin","end").orderBy("begin")
w3=Window().partitionBy("id","begin","end")
w4=Window().partitionBy("id","begin","end","maxrownum").orderBy("begin")
df.withColumn("seq", F.expr("""sequence(to_timestamp(begin), to_timestamp(end),interval 1 day)"""))\
  .withColumn('maxrownum', F.max(F.row_number().over(w2)).over(w3))\
  .withColumn('rowNum', F.row_number().over(w4))\
  .withColumn("seq1", F.expr("""flatten(filter(collect_list(seq) over\
                                (partition by id order by begin),x-> arrays_overlap(x,seq)==True and seq!=x))"""))\
  .withColumn("overlap", F.when(F.row_number().over(w1)==1, F.lit(0))\
              .when(F.size(F.array_intersect("seq","seq1"))!=0,F.size(F.array_intersect("seq","seq1"))-1)
              .when((F.col("maxrownum")!=1)&(F.col("rowNum")<F.col("maxrownum")),F.col("days"))\
              .otherwise(F.lit(0)))\
         .orderBy("id","end").drop("seq","seq1","maxrownum","rowNum").show()

#+-------+-------------------+-------------------+-----------------+-----------------+
#|     id|              begin|                end|             days|          overlap|
#+-------+-------------------+-------------------+-----------------+-----------------+
#|7777777|2019-01-05 01:00:00|2019-04-04 00:00:00|88.95833333333333|              0.0|
#|7777777|2019-04-04 00:00:00|2019-07-11 00:00:00|             98.0|              0.0|
#|7777777|2019-07-11 00:00:00|2019-09-17 00:00:00|             68.0|              0.0|
#|7777777|2019-09-17 00:00:00|2019-09-19 22:01:00|2.917361111111111|              0.0|
#|7777777|2019-09-19 22:01:00|2020-01-01 00:00:00|103.0826388888889|103.0826388888889|
#|7777777|2019-09-19 22:01:00|2020-01-01 00:00:00|103.0826388888889|              0.0|
#+-------+-------------------+-------------------+-----------------+-----------------+

UPDATE2:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w1=Window().partitionBy("id").orderBy("begin")
w2=Window().partitionBy("id","begin","end").orderBy("begin")
w3=Window().partitionBy("id","begin","end")
w4=Window().partitionBy("id","begin","end","maxrownum").orderBy("begin")
df.withColumn("seq", F.expr("""sequence(to_timestamp(begin), to_timestamp(end),interval 1 day)"""))\
  .withColumn('maxrownum', F.max(F.row_number().over(w2)).over(w3))\
  .withColumn('rowNum', F.row_number().over(w4))\
  .withColumn("seq1", F.expr("""flatten(filter(collect_list(seq) over\
                                (partition by id),x-> arrays_overlap(x,seq)==True and seq!=x))"""))\
  .withColumn("overlap", F.when(F.row_number().over(w1)==1, F.lit(0))\
              .when(F.size(F.array_intersect("seq","seq1"))!=0,F.size(F.array_intersect("seq","seq1"))-1)
              .when((F.col("maxrownum")!=1)&(F.col("rowNum")<F.col("maxrownum")),F.col("days"))\
              .when(F.col("maxrownum")==1,F.col("days"))\
              .otherwise(F.lit(0)))\
              .replace(1,0)\
         .orderBy("id","end").drop("seq","seq1","rowNum","maxrownum").show()

#+-------+-------------------+-------------------+------------------+------------------+
#|     id|              begin|                end|              days|           overlap|
#+-------+-------------------+-------------------+------------------+------------------+
#|9347774|2019-01-01 00:00:00|2019-01-08 02:10:00| 7.090277777777778|               7.0|
#|9347774|2019-01-08 02:10:00|2019-02-04 05:28:00|           27.1375|           27.1375|
#|9347774|2019-02-04 05:28:00|2019-03-05 19:29:00|29.584027777777777|29.584027777777777|
#|9347774|2019-03-05 19:29:00|2019-04-22 18:45:00| 47.96944444444444| 47.96944444444444|
#|9347774|2019-04-22 18:45:00|2019-05-03 05:05:00|10.430555555555555|10.430555555555555|
#|9347774|2019-05-03 05:05:00|2019-05-17 16:25:00|14.472222222222221|14.472222222222221|
#|9347774|2019-05-17 16:25:00|2019-06-05 22:18:00| 19.24513888888889| 19.24513888888889|
#|9347774|2019-01-01 00:00:00|2020-01-01 00:00:00|             365.0|               0.0|
#|9347774|2019-06-05 22:18:00|2020-01-01 00:00:00|209.07083333333333|209.07083333333333|
#+-------+-------------------+-------------------+------------------+------------------+

【讨论】:

  • 嗨 murtihash,我在上面添加了我的回复。请看一看。谢谢
  • @HarkaranSaini 请通过编辑您的问题添加对我的 sn-p 问题的回复,这样社区就可以在问题上看到它,而不是在我的回答中,是的,我看看。 .
  • @HarkaranSaini 检查我的更新,它应该涵盖所有情况.. 如果有任何遗漏,lmk 我明天会检查它
  • 我想你现在很接近了。我用一种似乎不起作用的情况更新了这个问题。非常感谢您帮助集思广益
  • 现在它停止在另一个场景中工作(请参阅我的问题)
【解决方案2】:

检查以下解决方案,看看是否适合您。

import pyspark.sql.functions as F

df = sc.parallelize([["1","2019-01-01 00:00:00","2019-01-08 02:10:00","7.090277777777778"],
["1","2019-02-04 05:28:00","2019-03-05 19:29:00","29.584027777777777"],
["1","2019-06-05 22:18:00","2020-01-01 00:00:00","209.07083333333333"],
["1","2019-05-17 16:25:00","2019-06-05 22:18:00","19.24513888888889"],
["1","2019-05-03 05:05:00","2019-05-17 16:25:00","14.472222222222221"],
["1","2019-01-08 02:10:00","2019-02-04 05:28:00","27.1375"],
["1","2019-01-01 00:00:00","2020-01-01 00:00:00","365.0"],
["1","2019-04-22 18:45:00","2019-05-03 05:05:00","10.430555555555555"],
["1","2019-03-05 19:29:00","2019-04-22 18:45:00","47.96944444444444"]]).toDF(("id","begin","end","days"))

df.withColumn("overlap", ((F.unix_timestamp(col("end")).cast("long") - F.unix_timestamp(col("begin")).cast("long"))/(24*3600))).show()

+---+-------------------+-------------------+------------------+------------------+
| id|              begin|                end|              days|           overlap|
+---+-------------------+-------------------+------------------+------------------+
|  1|2019-01-01 00:00:00|2019-01-08 02:10:00| 7.090277777777778| 7.090277777777778|
|  1|2019-02-04 05:28:00|2019-03-05 19:29:00|29.584027777777777|29.584027777777777|
|  1|2019-06-05 22:18:00|2020-01-01 00:00:00|209.07083333333333|          209.1125|
|  1|2019-05-17 16:25:00|2019-06-05 22:18:00| 19.24513888888889| 19.24513888888889|
|  1|2019-05-03 05:05:00|2019-05-17 16:25:00|14.472222222222221|14.472222222222221|
|  1|2019-01-08 02:10:00|2019-02-04 05:28:00|           27.1375|           27.1375|
|  1|2019-01-01 00:00:00|2020-01-01 00:00:00|             365.0|             365.0|
|  1|2019-04-22 18:45:00|2019-05-03 05:05:00|10.430555555555555|10.430555555555555|
|  1|2019-03-05 19:29:00|2019-04-22 18:45:00| 47.96944444444444| 47.92777777777778|
+---+-------------------+-------------------+------------------+------------------+

【讨论】:

  • 不,这不是简单的列减法。
  • 它符合您的预期输出。如果没有,您可以发布预期的输出吗?
  • 我已经发布了几个不同的场景来消除混乱
猜你喜欢
  • 2017-03-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-10-03
  • 2017-07-16
  • 1970-01-01
  • 1970-01-01
  • 2021-10-13
相关资源
最近更新 更多