【发布时间】:2020-10-16 20:15:34
【问题描述】:
我正在尝试对 spark 数据框进行一系列观察。基本上我有一个日期列表,我应该为每个组创建缺少的一个。
在 pandas 中有 reindex 函数,在 pyspark 中没有。
我试图实现一个熊猫 UDF:
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def reindex_by_date(df):
df = df.set_index('dates')
dates = pd.date_range(df.index.min(), df.index.max())
return df.reindex(dates, fill_value=0).ffill()
这看起来应该做我需要的,但是它失败了这个消息
AttributeError: Can only use .dt accessor with datetimelike values
.我在这里做错了什么?
完整代码如下:
data = spark.createDataFrame(
[(1, "2020-01-01", 0),
(1, "2020-01-03", 42),
(2, "2020-01-01", -1),
(2, "2020-01-03", -2)],
('id', 'dates', 'value'))
data = data.withColumn('dates', col('dates').cast("date"))
schema = StructType([
StructField('id', IntegerType()),
StructField('dates', DateType()),
StructField('value', DoubleType())])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def reindex_by_date(df):
df = df.set_index('dates')
dates = pd.date_range(df.index.min(), df.index.max())
return df.reindex(dates, fill_value=0).ffill()
data = data.groupby('id').apply(reindex_by_date)
理想情况下,我想要这样的东西:
+---+----------+-----+
| id| dates|value|
+---+----------+-----+
| 1|2020-01-01| 0|
| 1|2020-01-02| 0|
| 1|2020-01-03| 42|
| 2|2020-01-01| -1|
| 2|2020-01-02| 0|
| 2|2020-01-03| -2|
+---+----------+-----+
【问题讨论】:
标签: python pandas apache-spark pyspark