【发布时间】:2019-04-12 08:37:02
【问题描述】:
我有一张像
这样的大桌子我想将其更改为新表:id、date、last_state。
熊猫很简单:
df['time_create'] = pd.to_datetime(df['time_create'])
df = df.set_index('time_create')
df = df.sort_index()
df = df.groupby('id').resample('D').last().reset_index()
但是pyspark很难实现。
我知道:
-
pysaprk 中的重采样等价物是 groupby + window :
grouped = df.groupBy('store_product_id', window("time_create", "1 day")).agg(sum("Production").alias('Sum Production'))这里 groupby store_product_id ,在一天内重新采样并计算总和
-
分组并查找第一个或最后一个:
参考https://stackoverflow.com/a/35226857/1637673
w = Window().partitionBy("store_product_id").orderBy(col("time_create").desc()) (df .withColumn("rn", row_number().over(w)) .where(col("rn") == 1) .select("store_product_id", "time_create", "state"))这个 groupby id 并通过 time_create 获取最后一行的顺序。
但是我需要的是 groupby id,按天重新采样,然后按 time_create 获取最后一行。
我知道如果我使用 pandas udf 可能会解决这个问题,Applying UDFs on GroupedData in PySpark (with functioning python example)
但是有没有办法仅仅通过 pyspark 来做到这一点?
【问题讨论】:
-
你不能在你的窗口中包含一天吗?类似
Window().partitionBy("store_product_id", dayofmonth(col("time_create"))).orderBy(col("time_create").desc()) -
@gaw 听起来不错。我忘了
partitionBy可以多列。在这种情况下,不应该使用dayofmonth,需要从time_create添加一个新的日期列。但是分区是不是太多了?我从 2016 年到 2019 年的数据集有超过 2 亿行。 -
我认为这仍然是可能的。在这 4 年中,您有大约 1400 天和不同的产品,但我认为 spark 应该能够处理这个问题。只需确保将日期与月份和年份一起使用即可。我在一个大型代理数据集上应用了一个窗口,并通过客户端 IP 和目标 URL 对其进行了分区,它仍然有效。我想我为这个用例有更多的分区:)
标签: pandas apache-spark group-by pyspark grouping