【问题标题】:How to re-index data frame based on each partition如何根据每个分区重新索引数据帧
【发布时间】:2019-02-12 06:28:14
【问题描述】:

假设我有以下由 pyspark 创建的数据框

id  date         deleted
1   2019-02-07     true
1   2019-02-04     false
2   2019-02-01     true
3   2019-02-08     false
3   2019-02-06     true

我想从最早的日期到现在(比如 2019-02-09)每天重新索引这个表,最早的日期是基于每个 id,例如,对于 id 1,最早的日期是 2019- 02-04,对于id 3,最早的日期是2019-02-06。而预期的结果是:

id  date         deleted
1   2019-02-04     false
1   2019-02-05     null
1   2019-02-06     null
1   2019-02-07     true
1   2019-02-08     null
1   2019-02-09     null

2   2019-02-01     true
2   2019-02-02     null
      ...
2   2019-02-09     null

3   2019-02-06     true
3   2019-02-07     null
3   2019-02-08     false
3   2019-02-09     null

我知道如何根据所有 id(即 2019-02-01)来确定最早的日期,然后构建一个数据框,其中包含从 2019-02-01 到 2019-02-09 的所有日期id(交叉连接),然后左连接原始数据框。这种方法的问题是,如果有一个日期,比如 1980-01-01,那么 reindex 会为所有 id 填充从 1980-01-01 到现在的所有数据,这没有意义,并且会影响性​​能此数据帧上的以下 ETL。

根据每个分区求最早的日期,没找到好办法。

【问题讨论】:

    标签: python pyspark apache-spark-sql pyspark-sql


    【解决方案1】:

    假设您的原始 DataFrame 称为 df,并且 date 列的类型实际上是 DateType

    import pyspark.sql.functions as F
    from pyspark.sql.types import DateType, ArrayType
    import datetime
    
    # create a UDF to create a range of dates from a start
    # date until today
    def construct_date_range(start_date):
        ndays = (datetime.datetime.today() - start_date).days
        return reversed([base - datetime.timedelta(days=x) for x in range(0, ndays+1)])
    date_range_udf = F.udf(construct_date_range, ArrayType(DateType()))
    
    # find the first date for each id, and create a record for
    # all dates since the first
    id_dates = (
        df
        .groupBy('id')
        .agg(F.min('date').alias('min_date'))
        .withColumn('date_arr', construct_date_range('min_date'))
        .select('id', F.explode('date_arr').alias('date'))
    )
    
    result = id_dates.join(df, on=['id','date'], how='left')
    

    【讨论】:

    • 感谢解答,我认为解决方法是正确的,只是发现了一些格式问题。 unsupported operand type(s) for -: 'datetime.datetime' and 'str'。基于此,我成功了。
    【解决方案2】:

    基于@abeboparebop 的解决方案,我修复了一些格式问题并使其工作如下:

    import pyspark.sql.functions as F
    from pyspark.sql.types import DateType, ArrayType
    import pandas as pd
    
    from datetime import datetime
    
    import pandas as pd
    
    SYDNEY_TZ = "Australia/Sydney"
    
    def _utc_now():
        return datetime.utcnow()
    
    def _current_datetime_index(timezone=SYDNEY_TZ):
        return pd.DatetimeIndex([_utc_now()]).tz_localize("UTC").tz_convert(timezone).tz_localize(None)
    
    
    def current_datetime(timezone=SYDNEY_TZ):
        return _current_datetime_index(timezone).to_pydatetime()[0]
    
    def generate_date_list(date_from, date_to=None):
        if date_to is None:
            date_to = current_datetime()
        return pd.date_range(date_from.date(), date_to.date(), freq="D").date.tolist()
    
    
    def construct_date_range(start_date):
        return generate_date_list(pd.to_datetime(start_date))
    
    
    date_range_udf = F.udf(construct_date_range, ArrayType(DateType()))
    
    
    id_dates = (
        given_df
        .groupBy('id')
        .agg(F.min('date').alias('min_date'))
        .withColumn('date_arr', date_range_udf(F.col('min_date')))
        .select('id', F.explode('date_arr').alias('date'))
    )
    
    result = id_dates.join(given_df, on=['id', 'date'], how='left')
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-03-09
      • 2020-04-22
      • 2019-04-16
      • 1970-01-01
      • 2022-07-21
      • 1970-01-01
      • 1970-01-01
      • 2019-07-01
      相关资源
      最近更新 更多