要处理大分区,您可以尝试根据 orderBy 列(很可能是可以转换为数字的数字列或日期/时间戳列)拆分它,以便所有新的子分区保持正确的行顺序。使用新的分区器处理行并使用lag 和lead 函数进行计算,只需要对子分区之间边界周围的行进行后处理。 (下面也讨论了如何在task-2中合并小分区)
使用您的示例 sdf 并假设我们有以下 WinSpec 和一个简单的聚合函数:
w = Window.partitionBy('id').orderBy('timestamp')
df.withColumn('new_amt', F.lag('amt',1).over(w) + F.lead('amt',1).over(w))
Task-1:拆分大分区:
尝试以下方法:
-
选择一个N来拆分timestamp并设置一个额外的partitionBy列pid(使用ceil、int、@987654333 @等):
# N to cover 35-days' intervals
N = 24*3600*35
df1 = sdf.withColumn('pid', F.ceil(F.unix_timestamp('timestamp')/N))
-
将 pid 添加到 partitionBy(参见 w1),然后在 w1 上调用 row_number()、lag() 和 lead()。还可以查找每个新分区中的行数 (cnt),以帮助识别分区的结尾 (rn == cnt)。生成的 new_val 将适用于大多数行,每个分区边界上的行除外。
w1 = Window.partitionBy('id', 'pid').orderBy('timestamp')
w2 = Window.partitionBy('id', 'pid')
df2 = df1.select(
'*',
F.count('*').over(w2).alias('cnt'),
F.row_number().over(w1).alias('rn'),
(F.lag('amt',1).over(w1) + F.lead('amt',1).over(w1)).alias('new_amt')
)
下面是一个示例df2,显示了边界行。
-
处理边界:选择边界上的行rn in (1, cnt)加上那些在计算rn in (2, cnt-1)中使用的值,对w进行同样的new_val计算strong> 并仅保存边界行的结果。
df3 = df2.filter('rn in (1, 2, cnt-1, cnt)') \
.withColumn('new_amt', F.lag('amt',1).over(w) + F.lead('amt',1).over(w)) \
.filter('rn in (1,cnt)')
下面显示了从上面 df2
得到的 df3
-
将 df3 合并回 df2 以更新边界行rn in (1,cnt)
df_new = df2.filter('rn not in (1,cnt)').union(df3)
下面的屏幕截图显示了边界行周围的最终 df_new:
# drop columns which are used to implement logic only
df_new = df_new.drop('cnt', 'rn')
一些注意事项:
-
定义了以下3个WindowSpec:
w = Window.partitionBy('id').orderBy('timestamp') <-- fix boundary rows
w1 = Window.partitionBy('id', 'pid').orderBy('timestamp') <-- calculate internal rows
w2 = Window.partitionBy('id', 'pid') <-- find #rows in a partition
注意:严格来说,我们最好使用以下w 来修复边界行,以避免在边界周围绑定timestamp 出现问题。
w = Window.partitionBy('id').orderBy('pid', 'rn') <-- fix boundary rows
-
如果您知道哪些分区是倾斜的,只需划分它们并跳过其他分区。如果分布稀疏,现有方法可能会将一个小分区分成 2 个甚至更多
df1 = df.withColumn('pid', F.when(F.col('id').isin('a','b'), F.ceil(F.unix_timestamp('timestamp')/N)).otherwise(1))
如果对于每个分区,您可以检索count(行数)和min_ts=min(时间戳),然后为pid(低于M 是阈值行数)尝试更动态的方法分裂):
F.expr(f"IF(count>{M}, ceil((unix_timestamp(timestamp)-unix_timestamp(min_ts))/{N}), 1)")
注意:对于分区内的偏斜,将需要更复杂的函数来生成pid。
-
如果仅使用lag(1) 函数,则只需对左边界进行后处理,按rn in (1, cnt) 过滤并仅更新rn == 1
df3 = df1.filter('rn in (1, cnt)') \
.withColumn('new_amt', F.lag('amt',1).over(w)) \
.filter('rn = 1')
当我们只需要修复右边界并更新rn == cnt时,类似于lead函数
-
如果只使用lag(2),则使用df3过滤和更新更多行:
df3 = df1.filter('rn in (1, 2, cnt-1, cnt)') \
.withColumn('new_amt', F.lag('amt',2).over(w)) \
.filter('rn in (1,2)')
您可以将相同的方法扩展到 lag 和 lead 具有不同偏移量的混合情况。
Task-2:合并小分区:
根据一个分区count的记录数,我们可以设置一个阈值M,这样如果count>M,则id持有自己的分区,否则我们合并分区所以总记录的#of 小于M(下面的方法有一个边缘情况2*M-2)。
M = 20000
# create pandas df with columns `id`, `count` and `f`, sort rows so that rows with count>=M are located on top
d2 = pd.DataFrame([ e.asDict() for e in sdf.groupby('id').count().collect() ]) \
.assign(f=lambda x: x['count'].lt(M)) \
.sort_values('f')
# add pid column to merge smaller partitions but the total row-count in partition should be less than or around M
# potentially there could be at most `2*M-2` records for the same pid, to make sure strictly count<M, use a for-loop to iterate d1 and set pid:
d2['pid'] = (d2.mask(d2['count'].gt(M),M)['count'].shift(fill_value=0).cumsum()/M).astype(int)
# add pid to sdf. In case join is too heavy, try using Map
sdf_1 = sdf.join(spark.createDataFrame(d2).alias('d2'), ["id"]) \
.select(sdf["*"], F.col("d2.pid"))
# check pid: # of records and # of distinct ids
sdf_1.groupby('pid').agg(F.count('*').alias('count'), F.countDistinct('id').alias('cnt_ids')).orderBy('pid').show()
+---+-----+-------+
|pid|count|cnt_ids|
+---+-----+-------+
| 0|74837| 1|
| 1|20036| 133|
| 2|20052| 134|
| 3|20010| 133|
| 4|15065| 100|
+---+-----+-------+
现在,新的 Window 应该被单独的 pid 分区,并将 id 移动到 orderBy,如下所示:
w3 = Window.partitionBy('pid').orderBy('id','timestamp')
根据上述w3 WinSpec自定义lag/lead函数,然后计算new_val:
lag_w3 = lambda col,n=1: F.when(F.lag('id',n).over(w3) == F.col('id'), F.lag(col,n).over(w3))
lead_w3 = lambda col,n=1: F.when(F.lead('id',n).over(w3) == F.col('id'), F.lead(col,n).over(w3))
sdf_new = sdf_1.withColumn('new_val', lag_w3('amt',1) + lead_w3('amt',1))