【问题标题】:Filling missing dates in spark dataframe column填充火花数据框列中的缺失日期
【发布时间】:2018-03-24 08:56:01
【问题描述】:

我有一个带有列的 spark 数据框 - timestamp 类型的“日期”和 long 类型的“数量”。对于每个日期,我都有一些数量价值。日期按升序排列。但是有些日期丢失了。 例如 - 当前df -

Date        |    Quantity
10-09-2016  |    1
11-09-2016  |    2
14-09-2016  |    0
16-09-2016  |    1
17-09-2016  |    0
20-09-2016  |    2

如您所见,df 有一些缺失的日期,如 12-09-2016、13-09-2016 等。我想在数量字段中为那些缺失的日期输入 0,以便生成的 df 看起来像 -

Date        |    Quantity
10-09-2016  |    1
11-09-2016  |    2
12-09-2016  |    0
13-09-2016  |    0
14-09-2016  |    0
15-09-2016  |    0
16-09-2016  |    1
17-09-2016  |    0
18-09-2016  |    0
19-09-2016  |    0
20-09-2016  |    2

对此的任何帮助/建议将不胜感激。提前致谢。 请注意,我正在使用 scala 进行编码。

【问题讨论】:

    标签: scala datetime apache-spark apache-spark-sql


    【解决方案1】:

    为了便于理解代码,我以有点冗长的方式编写了这个答案。可以优化。

    需要导入

    import java.time.format.DateTimeFormatter
    import java.time.{LocalDate, LocalDateTime}
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types.{LongType, TimestampType}
    

    字符串到有效日期格式的 UDF

     val date_transform = udf((date: String) => {
        val dtFormatter = DateTimeFormatter.ofPattern("d-M-y")
        val dt = LocalDate.parse(date, dtFormatter)
        "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
          .replaceAll(" ", "0")
      })
    

    下面的 UDF 代码取自 Iterate over dates range

      def fill_dates = udf((start: String, excludedDiff: Int) => {
        val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
        val fromDt = LocalDateTime.parse(start, dtFormatter)
        (1 to (excludedDiff - 1)).map(day => {
          val dt = fromDt.plusDays(day)
          "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
            .replaceAll(" ", "0")
        })
      })
    

    设置示例数据框 (df)

    val df = Seq(
          ("10-09-2016", 1),
          ("11-09-2016", 2),
          ("14-09-2016", 0),
          ("16-09-2016", 1),
          ("17-09-2016", 0),
          ("20-09-2016", 2)).toDF("date", "quantity")
          .withColumn("date", date_transform($"date").cast(TimestampType))
          .withColumn("quantity", $"quantity".cast(LongType))
    
    df.printSchema()
    root
     |-- date: timestamp (nullable = true)
     |-- quantity: long (nullable = false)
    
    
    df.show()    
    +-------------------+--------+
    |               date|quantity|
    +-------------------+--------+
    |2016-09-10 00:00:00|       1|
    |2016-09-11 00:00:00|       2|
    |2016-09-14 00:00:00|       0|
    |2016-09-16 00:00:00|       1|
    |2016-09-17 00:00:00|       0|
    |2016-09-20 00:00:00|       2|
    +-------------------+--------+
    

    df创建一个临时数据框(tempDf)到union

    val w = Window.orderBy($"date")
    val tempDf = df.withColumn("diff", datediff(lead($"date", 1).over(w), $"date"))
      .filter($"diff" > 1) // Pick date diff more than one day to generate our date
      .withColumn("next_dates", fill_dates($"date", $"diff"))
      .withColumn("quantity", lit("0"))
      .withColumn("date", explode($"next_dates"))
      .withColumn("date", $"date".cast(TimestampType))
    
    tempDf.show(false)
    +-------------------+--------+----+------------------------+
    |date               |quantity|diff|next_dates              |
    +-------------------+--------+----+------------------------+
    |2016-09-12 00:00:00|0       |3   |[2016-09-12, 2016-09-13]|
    |2016-09-13 00:00:00|0       |3   |[2016-09-12, 2016-09-13]|
    |2016-09-15 00:00:00|0       |2   |[2016-09-15]            |
    |2016-09-18 00:00:00|0       |3   |[2016-09-18, 2016-09-19]|
    |2016-09-19 00:00:00|0       |3   |[2016-09-18, 2016-09-19]|
    +-------------------+--------+----+------------------------+
    

    现在合并两个数据框

    val result = df.union(tempDf.select("date", "quantity"))
      .orderBy("date")
    
    result.show()
    +-------------------+--------+
    |               date|quantity|
    +-------------------+--------+
    |2016-09-10 00:00:00|       1|
    |2016-09-11 00:00:00|       2|
    |2016-09-12 00:00:00|       0|
    |2016-09-13 00:00:00|       0|
    |2016-09-14 00:00:00|       0|
    |2016-09-15 00:00:00|       0|
    |2016-09-16 00:00:00|       1|
    |2016-09-17 00:00:00|       0|
    |2016-09-18 00:00:00|       0|
    |2016-09-19 00:00:00|       0|
    |2016-09-20 00:00:00|       2|
    +-------------------+--------+
    

    【讨论】:

    • 嗨!你能在 pyspark 中发布这个答案吗?
    【解决方案2】:

    根据@mrsrinivas 的出色回答,这里是 PySpark 版本。

    需要导入

    from typing import List
    import datetime
    from pyspark.sql import DataFrame, Window
    from pyspark.sql.functions import col, lit, udf, datediff, lead, explode
    from pyspark.sql.types import DateType, ArrayType
    

    UDF 创建下一个日期的范围

    def _get_next_dates(start_date: datetime.date, diff: int) -> List[datetime.date]:
        return [start_date + datetime.timedelta(days=days) for days in range(1, diff)]
    

    函数创建填充日期的 DateFrame(支持“分组”列):

    def _get_fill_dates_df(df: DataFrame, date_column: str, group_columns: List[str], fill_column: str) -> DataFrame:
        get_next_dates_udf = udf(_get_next_dates, ArrayType(DateType()))
    
        window = Window.orderBy(*group_columns, date_column)
    
        return df.withColumn("_diff", datediff(lead(date_column, 1).over(window), date_column)) \
            .filter(col("_diff") > 1).withColumn("_next_dates", get_next_dates_udf(date_column, "_diff")) \
            .withColumn(fill_column, lit("0")).withColumn(date_column, explode("_next_dates")) \
            .drop("_diff", "_next_dates")
    

    函数的用法:

    fill_df = _get_fill_dates_df(df, "Date", [], "Quantity")
    df = df.union(fill_df)
    

    假定日期列已经是日期类型。

    【讨论】:

      【解决方案3】:

      这里稍作修改,将此函数与月份一起使用并输入度量列(应设置为零的列)而不是组列:

      from typing import List
      import datetime
      from dateutil import relativedelta
      import math
      import pyspark.sql.functions as f
      from pyspark.sql import DataFrame, Window
      from pyspark.sql.types import DateType, ArrayType
      
      def fill_time_gaps_date_diff_based(df: pyspark.sql.dataframe.DataFrame, measure_columns: list, date_column: str):
          
          group_columns = [col for col in df.columns if col not in [date_column]+measure_columns]
          
          # save measure sums for qc
          qc = df.agg({col: 'sum' for col in measure_columns}).collect()
      
          # convert month to date
          convert_int_to_date = f.udf(lambda mth: datetime.datetime(year=math.floor(mth/100), month=mth%100, day=1), DateType())
          df = df.withColumn(date_column, convert_int_to_date(date_column))
      
          # sort values
          df = df.orderBy(group_columns)
      
          # get_fill_dates_df (instead of months_between also use date_diff for days)
          window = Window.orderBy(*group_columns, date_column)
      
          # calculate diff column
          fill_df = df.withColumn(
              "_diff", 
              f.months_between(f.lead(date_column, 1).over(window), date_column).cast(IntegerType())
          ).filter(
              f.col("_diff") > 1
          )
      
          # generate next dates
          def _get_next_dates(start_date: datetime.date, diff: int) -> List[datetime.date]:
              return [
                  start_date + relativedelta.relativedelta(months=months)
                  for months in range(1, diff)
              ]
      
          get_next_dates_udf = f.udf(_get_next_dates, ArrayType(DateType()))
      
          fill_df = fill_df.withColumn(
              "_next_dates",
              get_next_dates_udf(date_column, "_diff")
          )
      
          # set measure columns to 0
          for col in measure_columns:
              fill_df = fill_df.withColumn(col, f.lit(0))
      
          # explode next_dates column
          fill_df = fill_df.withColumn(date_column, f.explode('_next_dates'))
      
          # drop unneccessary columns
          fill_df = fill_df.drop(
              "_diff",
              "_next_dates"
          )
          
          # union df with fill_df
          df = df.union(fill_df)
          
          # qc: should be removed for productive runs
          if qc != df.agg({col: 'sum' for col in measure_columns}).collect():
              raise ValueError('Sums before and after run do not fit.')
          
          return df
      

      请注意,我假设月份是以 YYYYMM 形式给出的整数。这可以通过修改“将月份转换为日期”部分轻松调整。

      【讨论】:

        猜你喜欢
        • 2016-11-16
        • 2021-06-17
        • 2019-06-08
        • 1970-01-01
        • 2019-07-22
        • 2021-06-29
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多