【问题标题】:Pyspark: fill missing dates with latest row valuePyspark:用最新的行值填充缺失的日期
【发布时间】:2020-12-04 04:02:41
【问题描述】:

我发现了各种类似的问题,但没有一个能回答我的具体问题。

我需要使用基于日期列的最新行值填充 pyspark 数据框中缺失的日期行。

我目前的解决方案是计算到今天为止的缺失日期列表,加入原始 df 并用最新的有效值一一填充所有列:

# get the maximum date from the df
max_date = df.select(F.max('date')).first()['max(date)')

today = datetime.date.today()
delta = today - max_date
dates_list = [(today - datetime.timedelta(days=x),) for x in range(delta.days)]

# if there are missing rows
if dates_list:
    # create df with one column 'date'
    dates_df = spark.createDataFrame(dates_list, schema=date_schema)

    # join with original df
    df = df.join(F.broadcast(dates_df), ['date'], 'outer')

    w = Window.orderBy('date').rangeBetween(Window.unboundedPreceding, 0)
    
    # fill all columns with latest non null col value
    for c in df.columns:
        if c != 'date':
            df = df.withColumn(c, F.last(c, ignorenulls=True).over(w))

此代码的问题在于原始 df 包含大量列,并且 spark 为每个列计算一个窗口以获取最后一个非空值,这种方法效率非常低,导致逻辑计划庞大。

我想以一种简单的方式实现它,以简单地获取具有最大日期的行内容(因为它不包含空值)并使用计算日期列表更改日期直到今天。

对如何实施这种方法有什么建议吗?

示例输入:

date       | col_one | col_two | col_three | .. | col_n
-------------------------------------------------------
2020-08-15 | 0.1     | 6.5     | 9.8       | .. | 0.7
2020-08-14 | 0.2     | 5.5     | 1.8       | .. | 3.7
2020-08-13 | 0.4     | 7.5     | 1.3       | .. | 0.5
2020-08-12 | 3.1     | 8.5     | 9.8       | .. | 1.7
2020-08-11 | 0.15    | 6.9     | 9.7       | .. | 0.2

示例输出:

date       | col_one | col_two | col_three | .. | col_n
-------------------------------------------------------
2020-08-18 | 0.1     | 6.5     | 9.8       | .. | 0.7
2020-08-17 | 0.1     | 6.5     | 9.8       | .. | 0.7
2020-08-16 | 0.1     | 6.5     | 9.8       | .. | 0.7
2020-08-15 | 0.1     | 6.5     | 9.8       | .. | 0.7
2020-08-14 | 0.2     | 5.5     | 1.8       | .. | 3.7
2020-08-13 | 0.4     | 7.5     | 1.3       | .. | 0.5
2020-08-12 | 3.1     | 8.5     | 9.8       | .. | 1.7
2020-08-11 | 0.15    | 6.9     | 9.7       | .. | 0.2

【问题讨论】:

  • 能否请您发布一些示例 i/p 和 o/p?
  • 原始 df 只是一个带有日期列的数据框,就像 100 个浮点列一样,示例输出只包含截至今天的所有日期以及最新的行值。
  • 请在您的问题中添加示例输入和输出,这很有帮助,您的评论也没有帮助。
  • 我举了两个例子,希望现在更清楚

标签: python dataframe apache-spark pyspark


【解决方案1】:

一种可能的解决方案是使用最大日期过滤原始数据框,用第一个缺失日期填充日期列,并将相同的行内容与每个剩余缺失日期的缺失日期合并。然后最后加入原始df:

# compute the list of all dates from maximum date available till today
max_date = df.select(F.max('date')).first()['max(date)']
today = datetime.date.today()
dates_list = [today - datetime.timedelta(days=x) for x in range((today - max_date).days)]

# take the row with latest date values
max_date_values = df.filter(F.col('date') == max_date)
missing_dates_values = None

# duplicate latest values for the dates we are missing till today
for missing_date in dates_list:
    if missing_dates_values is None:
        missing_dates_values = max_date_values.withColumn('date', F.lit(missing_date))
    else:
       missing_dates_values = missing_dates_values.unionByName(max_date_values.withColumn('date',
                                                                                        F.lit(missing_date)))

# union with the original df
if dates_list:
    df = df.unionByName(missing_dates_values)

在我的用例中,火花查询计划相对于之前要简单得多,从而带来更好的性能。

【讨论】:

    猜你喜欢
    • 2020-12-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-05
    • 2022-01-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多