【问题标题】:Databricks / Spark equivalent to lookup (done via CROSS APPLY in SQL)Databricks / Spark 相当于查找(通过 SQL 中的 CROSS APPLY 完成)
【发布时间】:2021-04-14 07:54:39
【问题描述】:

我的用户有一个小的“缩放因子”配置,他们希望将其应用于固定大小(50,000 行)的表。

这是它当前的配置方式并变成了一个小数据框:

rank_lbounds  = [   0, 101, 175,  250, 500, 50000]
scale_factors = [0.64, 0.6, 0.8, 0.99, 1.0,   1.0]

from pyspark.sql import Row
ScalingFactor = Row("rank_min", "scale_factor")
df_scaling_factors = spark.createDataFrame(
  [ScalingFactor(rank, scale) for (rank, scale) in zip(rank_lbounds, scale_factors)])

这个想法是,一个结果集(50,000 行)将从最大到最小排序,然后前 100 个值将缩小 0.64 倍,接下来的 75 个缩小 0.6 倍,等等...

在 SQL 中,有效地做这种事情的“首选”方法显然是CROSS APPLY。这是他们的解决方案:

SELECT SomeKey, SomeValue, SomeValue_Rank, ScaleFactor,
       SomeValue_Scaled = (SomeValue * ScaleFactor)
FROM (
    SELECT SomeKey, SomeValue, SomeValue_Rank,
        T_FactorLookup.rank_min AS NextLowestRankLookup,
        T_FactorLookup.Rank_ScaleFactor AS ScaleFactor
    FROM (
        SELECT SomeKey, SomeValue,
               SomeValue_Rank = row_number() over(order by SomeValue desc, SomeKey)
        FROM dbo.TableOfValuesToScale
    ) AS T_Ranked
    CROSS APPLY(
        SELECT TOP 1 rank_min, Rank_ScaleFactor
        FROM Extortion_VoR_Scalefactors AS Factors
        WHERE Factors.rank_min <= SomeValue_Rank
        ORDER BY Factors.rank_min DESC
    ) T_FactorLookup
) T_WithScaleFactors

在尝试将其移植到数据块时,我不确定进行这种查找的最佳方法是什么。我知道查找表总是很小(稀疏的),所以在程序上,我不会对将它实现为双嵌套 for 循环或带过滤器的笛卡尔联接感到不安,但我想使用最佳实践,以免示例可用于更大的数据集。

我考虑过的解决方案:

  • 按原样使用源 SQL(但在 sparksql 中不支持 CROSS APPLY
  • 带过滤器的交叉连接(慢笛卡尔积,不推荐)
  • 手动将小 df_scaling_factors 表“分解”成一个 50,000 行的表,并对 row_number() over... = rank_min 进行简单连接。

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql azure-databricks


    【解决方案1】:

    我会加入行号。也许您可以在比例因子表中再添加一列以方便加入。

    from pyspark.sql import Row, Window, functions as F
    
    rank_lbounds  = [   0, 101, 175,  250, 500]
    rank_ubounds  = [ 100, 174, 249,  499, 50000]
    scale_factors = [0.64, 0.6, 0.8, 0.99, 1.0]
    
    ScalingFactor = Row("rank_min", "rank_max", "scale_factor")
    df_scaling_factors = spark.createDataFrame(
      [ScalingFactor(rankl, ranku, scale)
       for (rankl, ranku, scale) in zip(rank_lbounds, rank_ubounds, scale_factors)])
    
    df2 = df.withColumn('rn', F.row_number().over(Window.orderBy('value')))
    
    joined = df2.join(
        df_scaling_factors,
        (df2.rn >= df_scaling_factors.rank_min) & (df2.rn <= df_scaling_factors.rank_max)
    )
    joined2 = joined.withColumn('scaled_values', F.col('scale_factors') * F.col('value'))
    

    【讨论】:

    • 我喜欢。这很简单。我所做的唯一更改是派生rank_ubounds = [ x-1 for x in rank_lbounds[1:] ] + [50000] 以避免额外的配置数据。
    • 还使用Window.orderBy(F.col('value').desc()) 来捕捉降序排序的细微差别。
    猜你喜欢
    • 1970-01-01
    • 2012-08-18
    • 1970-01-01
    • 2012-07-13
    • 1970-01-01
    • 2019-09-07
    • 1970-01-01
    • 2014-12-04
    • 1970-01-01
    相关资源
    最近更新 更多