【问题标题】:How to drop rows and columns with nulls in one column pyspark如何在一列pyspark中删除带有空值的行和列
【发布时间】:2021-11-19 10:11:06
【问题描述】:

如何在 pyspark 的一列中删除带有空值的行

【问题讨论】:

  • 请不要使用图片。并输入一个可复制的最小示例,包括您的预期输出。

标签: python apache-spark pyspark group-by datediff


【解决方案1】:

这是您的示例数据:

from pyspark.sql import functions as F, Window

data = [
        ('5972015', '2021-06-29'),
        ('5972015', '2021-08-28'),
        ('5972015', '2021-08-29'),
        ('5972015', '2020-12-08'),
        ('5752541', '2019-01-09'),
        ('5752541', '2020-04-24'),
        ('5972015', '2020-04-09'),
        ('5972015', '2020-04-10'),
        ('5972015', '2020-01-16'),
        ('11311226', '2020-06-17')
]
df = spark.createDataFrame(data, ['uid', 'date'])
df = df.withColumn('date', F.col('date').cast('date'))

如果要计算每个 uid 的最大连续天数:

w1 = Window.partitionBy('uid').orderBy(F.col('date').asc_nulls_last())
df = df.withColumn('g', F.when(F.datediff('date', F.lag('date', 1).over(w1)) == 1, 0).otherwise(1))
df = df.withColumn('grp', F.sum('g').over(w1))
w2 = Window.partitionBy('uid', 'grp')
df = df.withColumn('consecutive', F.count('*').over(w2))
df = df.groupBy('uid').agg(F.max('consecutive').alias('max_consecutive'))

df.show()
#+--------+---------------+
#|     uid|max_consecutive|
#+--------+---------------+
#| 5972015|              2|
#| 5752541|              1|
#|11311226|              1|
#+--------+---------------+

如果要查找从最小日期到最大日期的天数:

w1 = Window.partitionBy('uid').orderBy(F.col('date').desc_nulls_last())
w2 = Window.partitionBy('uid').orderBy(F.col('date').asc_nulls_last())
df = df.select(
    'uid',
    F.datediff(F.max('date').over(w1), F.min('date').over(w2)).alias('datediff')
).distinct()

df.show()
#+--------+--------+
#|     uid|datediff|
#+--------+--------+
#| 5972015|     591|
#| 5752541|     471|
#|11311226|       0|
#+--------+--------+

【讨论】:

    猜你喜欢
    • 2017-10-25
    • 2021-06-27
    • 2018-12-21
    • 1970-01-01
    • 1970-01-01
    • 2022-08-13
    • 1970-01-01
    • 1970-01-01
    • 2017-11-26
    相关资源
    最近更新 更多