【问题标题】:Pandas-style transform of grouped data on PySpark DataFramePySpark DataFrame 上分组数据的 Pandas 风格转换
【发布时间】:2016-03-31 14:15:51
【问题描述】:

如果我们有一个由一列类别和一列值组成的 Pandas 数据框,我们可以通过执行以下操作来删除每个类别中的平均值:

df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g))

据我了解,Spark 数据帧不直接提供这种分组/转换操作(我在 Spark 1.5.0 上使用 PySpark)。那么,实现这种计算的最佳方法是什么?

我尝试使用 group-by/join 如下:

df2 = df.groupBy("Category").mean("Values")
df3 = df2.join(df)

但它非常慢,因为据我了解,每个类别都需要对 DataFrame 进行全面扫描。

我认为(但尚未验证)如果我将 group-by/mean 的结果收集到字典中,然后在 UDF 中使用该字典,如下所示:

nameToMean = {...}
f = lambda category, value: value - nameToMean[category]
categoryDemeaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType())
df = df.withColumn("DemeanedValue", categoryDemeaned(df.Category, df.Value))

有没有一种惯用的方式来表达这种类型的操作而不牺牲性能?

【问题讨论】:

    标签: python pandas apache-spark pyspark apache-spark-sql


    【解决方案1】:

    您可以使用Window 来执行此操作

    import pyspark.sql.functions as F
    from pyspark.sql.window import Window
    
    window_var = Window().partitionBy('Categroy')
    df = df.withColumn('DemeanedValues', F.col('Values') - F.mean('Values').over(window_var))
    

    【讨论】:

      【解决方案2】:

      实际上,在 Spark 中有一种惯用的方式来执行此操作,使用 Hive OVER 表达式。

      df.registerTempTable('df')
      with_category_means = sqlContext.sql('select *, mean(Values) OVER (PARTITION BY Category) as category_mean from df')
      

      在底层,这是使用窗口函数。不过,我不确定这是否比您的解决方案更快

      【讨论】:

        【解决方案3】:

        我了解,每个类别都需要对 DataFrame 进行全面扫描。

        不,它没有。 DataFrame 聚合使用类似于aggregateByKey 的逻辑执行。请参阅DataFrame groupBy behaviour/optimization 较慢的部分是join,它需要排序/改组。但它仍然不需要按组扫描。

        如果这是您使用的确切代码,它会很慢,因为您没有提供连接表达式。因此,它只是执行笛卡尔积。所以它不仅效率低,而且不正确。你想要这样的东西:

        from pyspark.sql.functions import col
        
        means = df.groupBy("Category").mean("Values").alias("means")
        df.alias("df").join(means, col("df.Category") == col("means.Category"))
        

        我认为(但尚未验证)如果我将 group-by/mean 的结果收集到字典中,然后在 UDF 中使用该字典,我可以大大加快速度

        这是可能的,尽管性能会因具体情况而异。使用 Python UDF 的一个问题是它必须将数据移入和移出 Python。尽管如此,它绝对值得一试。不过,您应该考虑为nameToMean 使用广播变量。

        有没有一种惯用的方式来表达这种类型的操作而不牺牲性能?

        在 PySpark 1.6 中,您可以使用 broadcast 函数:

        df.alias("df").join(
            broadcast(means), col("df.Category") == col("means.Category"))
        

        但它在

        【讨论】:

        • 感谢您的回复。我不知道 df.join(); 中的笛卡尔积行为;我错误地认为默认行为是加入任何具有相同名称的列。为均值表中的类别列添加一个带有别名的显式相等性测试大大加快了速度。
        • 不客气。检查执行扩展执行计划(df.explain(extended=True))总是有用的。忽略配置的最常见问题与笛卡尔积有关,即使您提供连接表达式,也可能无法优化。
        猜你喜欢
        • 1970-01-01
        • 2023-02-02
        • 2021-02-24
        • 1970-01-01
        • 1970-01-01
        • 2018-02-15
        • 1970-01-01
        • 2019-08-04
        • 2020-02-10
        相关资源
        最近更新 更多