【发布时间】:2021-04-14 07:54:39
【问题描述】:
我的用户有一个小的“缩放因子”配置,他们希望将其应用于固定大小(50,000 行)的表。
这是它当前的配置方式并变成了一个小数据框:
rank_lbounds = [ 0, 101, 175, 250, 500, 50000]
scale_factors = [0.64, 0.6, 0.8, 0.99, 1.0, 1.0]
from pyspark.sql import Row
ScalingFactor = Row("rank_min", "scale_factor")
df_scaling_factors = spark.createDataFrame(
[ScalingFactor(rank, scale) for (rank, scale) in zip(rank_lbounds, scale_factors)])
这个想法是,一个结果集(50,000 行)将从最大到最小排序,然后前 100 个值将缩小 0.64 倍,接下来的 75 个缩小 0.6 倍,等等...
在 SQL 中,有效地做这种事情的“首选”方法显然是CROSS APPLY。这是他们的解决方案:
SELECT SomeKey, SomeValue, SomeValue_Rank, ScaleFactor,
SomeValue_Scaled = (SomeValue * ScaleFactor)
FROM (
SELECT SomeKey, SomeValue, SomeValue_Rank,
T_FactorLookup.rank_min AS NextLowestRankLookup,
T_FactorLookup.Rank_ScaleFactor AS ScaleFactor
FROM (
SELECT SomeKey, SomeValue,
SomeValue_Rank = row_number() over(order by SomeValue desc, SomeKey)
FROM dbo.TableOfValuesToScale
) AS T_Ranked
CROSS APPLY(
SELECT TOP 1 rank_min, Rank_ScaleFactor
FROM Extortion_VoR_Scalefactors AS Factors
WHERE Factors.rank_min <= SomeValue_Rank
ORDER BY Factors.rank_min DESC
) T_FactorLookup
) T_WithScaleFactors
在尝试将其移植到数据块时,我不确定进行这种查找的最佳方法是什么。我知道查找表总是很小(稀疏的),所以在程序上,我不会对将它实现为双嵌套 for 循环或带过滤器的笛卡尔联接感到不安,但我想使用最佳实践,以免示例可用于更大的数据集。
我考虑过的解决方案:
- 按原样使用源 SQL(但在 sparksql 中不支持
CROSS APPLY) - 带过滤器的交叉连接(慢笛卡尔积,不推荐)
- 手动将小
df_scaling_factors表“分解”成一个 50,000 行的表,并对row_number() over...=rank_min进行简单连接。
【问题讨论】:
标签: apache-spark pyspark apache-spark-sql azure-databricks