【问题标题】:Pandas UDF in pysparkpyspark 中的 Pandas UDF
【发布时间】:2020-10-16 20:15:34
【问题描述】:

我正在尝试对 spark 数据框进行一系列观察。基本上我有一个日期列表,我应该为每个组创建缺少的一个。
在 pandas 中有 reindex 函数,在 pyspark 中没有。
我试图实现一个熊猫 UDF:

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def reindex_by_date(df):
    df = df.set_index('dates')
    dates = pd.date_range(df.index.min(), df.index.max())
    return df.reindex(dates, fill_value=0).ffill()

这看起来应该做我需要的,但是它失败了这个消息 AttributeError: Can only use .dt accessor with datetimelike values .我在这里做错了什么?
完整代码如下:

data = spark.createDataFrame(
        [(1, "2020-01-01", 0), 
        (1, "2020-01-03", 42), 
        (2, "2020-01-01", -1), 
        (2, "2020-01-03", -2)],
        ('id', 'dates', 'value'))

data = data.withColumn('dates', col('dates').cast("date"))

schema = StructType([
     StructField('id', IntegerType()),
     StructField('dates', DateType()),
     StructField('value', DoubleType())])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def reindex_by_date(df):
     df = df.set_index('dates')
     dates = pd.date_range(df.index.min(), df.index.max())
     return df.reindex(dates, fill_value=0).ffill()

data = data.groupby('id').apply(reindex_by_date)

理想情况下,我想要这样的东西:

+---+----------+-----+                                                          
| id|     dates|value|
+---+----------+-----+
|  1|2020-01-01|    0|
|  1|2020-01-02|    0|
|  1|2020-01-03|   42|
|  2|2020-01-01|   -1|
|  2|2020-01-02|    0|
|  2|2020-01-03|   -2|
+---+----------+-----+

【问题讨论】:

    标签: python pandas apache-spark pyspark


    【解决方案1】:

    案例 1:每个 ID 都有一个单独的日期范围。

    我会尽量减少 udf 的内容。在这种情况下,我只会计算 udf 中每个 ID 的日期范围。对于其他部分,我将使用 Spark 原生函数。

    from pyspark.sql import types as T
    from pyspark.sql import functions as F
    
    # Get min and max date per ID
    date_ranges = data.groupby('id').agg(F.min('dates').alias('date_min'), F.max('dates').alias('date_max'))
    
    # Calculate the date range for each ID
    @F.udf(returnType=T.ArrayType(T.DateType()))
    def get_date_range(date_min, date_max):
      return [t.date() for t in list(pd.date_range(date_min, date_max))]
    
    # To get one row per potential date, we need to explode the UDF output
    date_ranges = date_ranges.withColumn(
      'dates',
      F.explode(get_date_range(F.col('date_min'), F.col('date_max')))
    )
    
    date_ranges = date_ranges.drop('date_min', 'date_max')
    
    # Add the value for existing entries and add 0 for others
    result = date_ranges.join(
      data,
      ['id', 'dates'],
      'left'
    )
    
    result = result.fillna({'value': 0})
    

    案例 2:所有 id 的日期范围相同

    我认为这里没有必要使用 UDF。您想要的内容可以以不同的方式存档:首先,您可以获得所有可能的 ID 和所有必要的日期。其次,你交叉加入它们,这将为你提供所有可能的组合。第三,将原始数据左连接到组合上。四、将出现的空值替换为0。

    # Get all unique ids
    ids_df = data.select('id').distinct()
    
    # Get the date series
    date_min, date_max = data.agg(F.min('dates'), F.max('dates')).collect()[0]
    dates = [[t.date()] for t in list(pd.date_range(date_min, date_max))]
    dates_df = spark.createDataFrame(data=dates, schema="dates:date")
    
    # Calculate all combinations
    all_comdinations = ids_df.crossJoin(dates_df)
    
    # Add the value column
    result = all_comdinations.join(
      data,
      ['id', 'dates'],
      'left'
    )
    
    # Replace all null values with 0
    result = result.fillna({'value': 0})
    

    请注意此解决方案的以下限制:

    1. crossJoin 的成本可能很高。可以在this related question 中找到解决此问题的一种潜在解决方案。
    2. collect 语句和 Pandas 的使用会导致 Spark 转换的并行化不完美。

    [编辑] 分为两种情况,因为我首先认为所有 ID 都具有相同的日期范围。

    【讨论】:

    • 这可能有效,但是每个idminmax 日期可能不同。在示例中它们是相同的,但并非总是如此。
    • 感谢您的澄清。对于每个id 都有自己的日期范围的情况,我添加了另一个解决方案。
    猜你喜欢
    • 2021-07-06
    • 1970-01-01
    • 2021-09-24
    • 2021-08-13
    • 2020-03-31
    • 1970-01-01
    • 1970-01-01
    • 2019-02-23
    • 2020-07-04
    相关资源
    最近更新 更多