【问题标题】:Find min and max range with a combination of column values in PySpark使用 PySpark 中的列值组合查找最小和最大范围
【发布时间】:2019-09-15 06:55:19
【问题描述】:

我有一个这样的 pyspark 数据框,

+----------+--------+----------+----------+
|id_       | p      |d1        |  d2      |
+----------+--------+----------+----------+
|  1       | A      |2018-09-26|2018-10-26|
|  2       | B      |2018-06-21|2018-07-19|
|  2       | C      |2018-07-13|2018-10-07|
|  2       | B      |2018-12-31|2019-02-27|
|  2       | A      |2019-01-28|2019-06-25|
-------------------------------------------

我必须从这个数据帧中制作一个这样的数据帧,

+----------+--------+----------+----------+
|id_       | q      |d1        |  d2      |
+----------+--------+----------+----------+
|  1       | A      |2018-09-26|2018-10-26|
|  2       | B      |2018-06-21|2018-07-12|
|  2       | B C    |2018-07-13|2018-07-19|
|  2       | C      |2018-07-20|2019-10-07|
|  2       | B      |2018-12-31|2019-01-27|
|  2       | B A    |2019-01-28|2019-02-27|
|  2       | A      |2019-02-28|2019-06-25|
-------------------------------------------

类似于查找特定id_ 的数据中从何时到何时出现的p 值。如果同一天有多个p,那么两者都应该出现在数据中,并用空格分隔。

我尝试通过在min(d1)max(d2) 范围内创建每个日期并相应地填充它们来做到这一点。从该数据框中,经过一些融合和分组后,我可以获得所需的结果。

但是这个过程需要很长时间并且效率很低。

我正在寻找一种有效的方法来执行这项任务。

我还可以有更复杂的重叠情况,即两个以上 p 值之间的重叠。

请参阅下面的示例数据,

+----------+--------+----------+----------+
|id_       | p      |d1        |  d2      |
+----------+--------+----------+----------+
|  1       | A      |2018-09-26|2018-10-26|
|  2       | B      |2018-06-21|2018-07-19|
|  2       | C      |2018-06-27|2018-07-07|
|  2       | A      |2018-07-02|2019-02-27|
|  2       | A      |2019-03-28|2019-06-25|
-------------------------------------------

这必须转换成,

+----------+--------+----------+----------+
|id_       | q      |d1        |  d2      |
+----------+--------+----------+----------+
|  1       | A      |2018-09-26|2018-10-26|
|  2       | B      |2018-06-21|2018-06-26|
|  2       | B C    |2018-06-27|2018-07-01|
|  2       | B C A  |2018-07-02|2018-07-07|
|  2       | A B    |2018-07-08|2018-07-19|
|  2       | A      |2018-07-20|2019-02-27|
|  2       | A      |2019-03-28|2019-06-25|
-------------------------------------------

q 中各个项目的顺序无关紧要。即,如果 A、B 和 C 重叠。它可以显示为 A B C 或 B C A 或 A C B 等等。

我还添加了一个难以实现的边缘案例,即d2 == lead(d1).over(window)。在这种情况下,可以安全地假设p 的值是不同的。即p != lead(p).over(window)

+---+---+----------+----------+
|id_| p |    d1    | d2       |
+---+---+----------+----------+
|100| 12|2013-10-16|2014-01-17|
|100| 12|2014-01-20|2014-04-15|
|100| 12|2014-04-22|2014-05-19|
|100| 12|2014-05-22|2014-06-19|
|100| 12|2014-07-23|2014-09-18|
|100| 12|2014-09-23|2014-12-18|
|100| 12|2014-12-20|2015-01-16|
|100| 12|2015-01-23|2015-02-19|
|100| 12|2015-02-21|2015-04-20|
|100| 7 |2015-04-20|2015-05-17|
|100| 7 |2015-05-19|2015-06-15|
|100| 7 |2015-06-18|2015-09-01|
|100| 7 |2015-09-09|2015-11-26|
+---+---+----------+----------+

在上述数据中,倒数第 4 行和第 5 行显示了这种情况。在这种情况下,预期的结果是,

+---+-----+----------+----------+
|id_| p   | d1       | d2       |
+---+-----+----------+----------+
|100| 12  |2013-10-16|2014-01-17|
|100| 12  |2014-01-20|2014-04-15|
|100| 12  |2014-04-22|2014-05-19|
|100| 12  |2014-05-22|2014-06-19|
|100| 12  |2014-07-23|2014-09-18|
|100| 12  |2014-09-23|2014-12-18|
|100| 12  |2014-12-20|2015-01-16|
|100| 12  |2015-01-23|2015-02-19|
|100| 12  |2015-02-21|2015-04-19|
|100| 12 7|2015-04-20|2015-04-20|
|100| 7   |2015-04-21|2015-05-17|
|100| 7   |2015-05-19|2015-06-15|
|100| 7   |2015-06-18|2015-09-01|
|100| 7   |2015-09-09|2015-11-26|
+---+-----+----------+----------+

下面给出了相同情况的另一个例子,

+---+---+----------+----------+
|id_| p | d1       | d2       |
+---+---+----------+----------+
|101| 12|2015-02-24|2015-03-23|
|101| 12|2015-04-01|2015-05-19|
|101| 12|2015-05-29|2015-06-25|
|101| 12|2015-07-03|2015-07-30|
|101| 12|2015-09-02|2015-09-29|
|101| 12|2015-10-02|2015-10-29|
|101| 9 |2015-10-29|2015-11-11|
|101| 9 |2015-11-25|2015-12-22|
+---+---+----------+----------+

同样的预期结果是,

+---+-----+----------+----------+
|id_| q   | d1       | d2       |
+---+-----+----------+----------+
|101| 12  |2015-02-24|2015-03-23|
|101| 12  |2015-04-01|2015-05-19|
|101| 12  |2015-05-29|2015-06-25|
|101| 12  |2015-07-03|2015-07-30|
|101| 12  |2015-09-02|2015-09-29|
|101| 12  |2015-10-02|2015-10-28|
|101| 12 9|2015-10-29|2015-10-29|
|101| 9   |2015-10-30|2015-11-11|
|101| 9   |2015-11-25|2015-12-22|
+---+---+------------+----------+

【问题讨论】:

  • 嗨 - 数据集大小是多少?
  • 几百万行。不到十亿
  • 请注意组合可以在两个以上的 p 值之间。像 A B C 或 A B C D 等等
  • @SreeramTP,调整代码以反映设置聊天/cmets 中讨论的边界的逻辑。请查看并让我知道任何问题。此外,由于添加了字符串操作,这将比之前的代码慢。
  • @SreeramTP 它看起来可以使用我的帖子的第一个版本进行预处理,并在使用我的帖子中的代码逻辑之前对测试条件进行一些修改。

标签: python dataframe pyspark


【解决方案1】:

更新:基于 OP 的 cmets 和更新,由于可能发生任何数量的重叠,我认为 dataframe-JOIN 可能是最直接的方法。下面是我在 Spark 2.4.0 上测试的全新解决方案(array_join、transform、sequence 等需要 Spark 2.4+):

Update-2: 根据 cmets/chat 中的讨论,我添加了代码逻辑来为每个 drange(d1, d2) 设置边界,以了解如何/何时调整 d1/d2, df_drange 中需要一个新的 flag 字段来完成此逻辑。详情见下文Set up boundaries部分

Update-3: 调整代码以在 df_drange 中处理 (d1 == d2) 时进行处理。最初删除了此类案例。

设置数据:

注意:我添加了 df2,d1 和 d2 转换为 DateType(),而原始 df 保留两个字段作为 StringType(),因为我们需要一些连接操作。

from pyspark.sql import Window
from pyspark.sql.functions import lead, expr, to_date, collect_set, array_sort, array_join, broadcast

df = spark.createDataFrame([
      (1, 'A', '2018-09-26', '2018-10-26')
    , (2, 'B', '2018-06-21', '2018-07-19')
    , (2, 'C', '2018-06-27', '2018-07-07')
    , (2, 'A', '2018-07-02', '2019-02-27')
    , (2, 'A', '2019-03-28', '2019-06-25')
  ], ['id_', 'p', 'd1', 'd2'])

# convert d1, d2 to DateType() if they are StringType()
df2 = df.withColumn('d1', to_date('d1')).withColumn('d2', to_date('d2'))

df2.printSchema()
root
 |-- id_: long (nullable = true)
 |-- p: string (nullable = true)
 |-- d1: date (nullable = true)
 |-- d2: date (nullable = true)

创建引用数据框:df_drange

df_drange 包含来自 d1 和 d2 的所有不同日期, 加上一个标志,当df_drange.d1 来自df.d2(在原始df 中)时设置为1,否则设置为0。对日期进行排序并将它们划分为间隔日期范围。检索字段d1d2flag(仅限d1)并将它们转换为正确的DataType()

df_drange = df.select('id_', 'd1', lit(0).alias('flag')).union(df.select('id_', 'd2', lit(1))) \
    .groupby('id_') \
    .agg(array_sort(collect_set(concat('d1', lit('-'), 'flag'))).alias('dates')) \
    .withColumn('dates', expr("""
         explode(transform(sequence(0, size(dates)-2), i -> named_struct('d1', dates[i], 'd2', dates[i+1])))
       """)) \
    .selectExpr(
         'id_'
       , "to_date(substring_index(dates.d1, '-', 3)) as d1"
       , "to_date(substring_index(dates.d2, '-', 3)) as d2"
       , "boolean(substring_index(dates.d1, '-', -1)) as flag"
     )

df_drange.orderBy('id_','d1').show()
+---+----------+----------+-----+
|id_|        d1|        d2| flag|
+---+----------+----------+-----+
|  1|2018-09-26|2018-10-26|false|
|  2|2018-06-21|2018-06-27|false|
|  2|2018-06-27|2018-07-02|false|
|  2|2018-07-02|2018-07-07|false|
|  2|2018-07-07|2018-07-19| true|
|  2|2018-07-19|2019-02-27| true|
|  2|2019-02-27|2019-03-28| true|
|  2|2019-03-28|2019-06-25|false|
+---+----------+----------+-----+

df_drange.printSchema()
root
 |-- id_: long (nullable = true)
 |-- d1: date (nullable = true)
 |-- d2: date (nullable = true)
 |-- flag: boolean (nullable = true)

使用 Join 设置 df1

与原始 df 和每个 id_ 有任何重叠的左连接 在 df_dranges 的 (d1, d2) 和 (d1, d2) 之间原始df。后 groupby(id_, d1, d2, flag) from df_drange,得到array_join(collect_set(p), ' '):

df1 = broadcast(df_drange).join(
      df2
    , (df2.id_ == df_drange.id_) & (
            ((df2.d1 < df_drange.d2) & (df2.d2 > df_drange.d1)) 
          | ((df_drange.d1 == df_drange.d2) & df_drange.d1.between(df2.d1, df2.d2)) 
      )
    , how = 'left'
).groupby(df_drange.id_, df_drange.d1, df_drange.d2, df_drange.flag) \
 .agg(array_join(collect_set('p'), ' ').alias('q'))

df1.show()
+---+----------+----------+-----+-----+
|id_|        d1|        d2| flag|    q|
+---+----------+----------+-----+-----+
|  1|2018-09-26|2018-10-26|false|    A|
|  2|2018-06-21|2018-06-27|false|    B|
|  2|2018-06-27|2018-07-02|false|  C B|
|  2|2018-07-02|2018-07-07|false|C B A|
|  2|2018-07-07|2018-07-19| true|  B A|
|  2|2018-07-19|2019-02-27| true|    A|
|  2|2019-02-27|2019-03-28| true|     |
|  2|2019-03-28|2019-06-25|false|    A|
+---+----------+----------+-----+-----+

设置边界

对于 df1,如果 q == '' 存在间隙,则应删除此类行。 每个 drange 的边界是根据 flag、next_flag、next_d1 定义的 如 cmets/chat 中所述。下面是显示当前逻辑如何/何时调整 d1/d2 的伪代码:

flag = (if d1 is from original_d2) ? true : false
both next_d1 and next_flag defined on WindowSpec-w1

# for df1.d1: if flag is true, add 1 day, otherwise keep as-is
d1 = IF(flag, date_add(d1,1), d1)

# for df1.d2: keep as-is when there is gap with the next row or 
# the next_flag is true, else minus 1 day
d2 = IF((next_d1 != d2) or next_flag, d2, date_sub(d2,1))

实际代码:

# WindowSpec to calculate next_d1
w1 = Window.partitionBy('id_').orderBy('d1')

# filter out gaps and calculate next_d1 and the adjusted d1 and d2
df_new = df1.where('q!= ""') \
            .withColumn('next_d1', lead('d1').over(w1)) \
            .withColumn('next_flag', coalesce(lead('flag').over(w1), lit(True))) \
            .selectExpr(
                    'id_'
                  , 'q'
                  , 'IF(flag, date_add(d1,1), d1) AS d1'
                  , 'IF((next_d1 != d2) or next_flag, d2, date_sub(d2,1)) AS d2'
             )

df_new.show()
+---+-----+----------+----------+
|id_|    q|        d1|        d2|
+---+-----+----------+----------+
|  1|    A|2018-09-26|2018-10-26|
|  2|    B|2018-06-21|2018-06-26|
|  2|  C B|2018-06-27|2018-07-01|
|  2|C B A|2018-07-02|2018-07-07|
|  2|  B A|2018-07-08|2018-07-19|
|  2|    A|2018-07-20|2019-02-27|
|  2|    A|2019-03-28|2019-06-25|
+---+-----+----------+----------+

【讨论】:

  • 两排以上有重叠怎么办?
  • 你想如何处理超过两行的重叠,你能添加一些样本和预期结果吗?我以为您在 cmets 中组合添加的 p 值不超过两个???
  • 两行以上可能有重叠,p值多于两个
  • 嗯,也许您真的应该从您在主帖的评论中删除 notAnd please not the combination can be among more than two p values。我明天会检查这个。现在在我的时间太晚了。但是,如果您可以添加一个包含此类案例和预期结果的示例,那就更好了。这可以节省我们猜测的时间。
  • 我很抱歉。我的意思是。笔记。我将进行更改以显示两个以上的 p 值。
猜你喜欢
  • 2016-08-31
  • 2019-02-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-10-25
  • 2014-03-22
  • 1970-01-01
  • 2015-09-29
相关资源
最近更新 更多