【问题标题】:Optimising GroupBy in PySpark在 PySpark 中优化 GroupBy
【发布时间】:2022-01-10 11:53:27
【问题描述】:

我有一个数据集,我在其中按多个变量进行分组,以使用 PySpark 计算每个用户 ID 的最大值和平均值的中值,如下所示:

import pyspark.sql.functions as F
df = spark.read.parquet("s3a://xxx").select("id", "timestamp", "category", "value")
df1 = df.groupBy("id", "timestamp", "category").agg(F.max("value"))
df2 = df1.groupBy("id", "timestamp").agg(
    F.max("value").alias("max_value"), F.mean("value").alias("avg_value")
)
df3 = df2.groupBy("id").agg(
    F.expr("percentile(max_value, array(0.5))")[0].alias("median_max_value"),
    F.expr("percentile(avg_value, array(0.5))")[0].alias("median_avg_value"),
)
df3.show()

这按预期工作,但需要大约。 2 小时运行数十亿行。有没有办法优化这个?

【问题讨论】:

  • 请将“df3.inspect(True)”的输出添加到问题中,以显示逻辑和物理查询计划。可能您可以通过对数据进行预排序来获得一些优化,但如果没有计划就很难判断。

标签: python python-3.x dataframe apache-spark pyspark


【解决方案1】:

你没有做任何我会优化的特别事情。但是,我会运行pandas-profiling 来查看数据分布。也许您可以通过使用自定义分区器在分组之前对数据进行分区或在分组期间使用分区函数来受益。

【讨论】:

    【解决方案2】:

    解决方案是按“id”重新分区,这样代码现在看起来类似于:

    import pyspark.sql.functions as F
    df = spark.read.parquet("s3a://xxx").select("id", "timestamp", "category", "value").repartition(F.col("id"))
    df1 = df.groupBy("id", "timestamp", "category").agg(F.max("value"))
    df2 = df1.groupBy("id", "timestamp").agg(
        F.max("value").alias("max_value"), F.mean("value").alias("avg_value")
    )
    df3 = df2.groupBy("id").agg(
        F.expr("percentile(max_value, array(0.5))")[0].alias("median_max_value"),
        F.expr("percentile(avg_value, array(0.5))")[0].alias("median_avg_value"),
    )
    df3.show()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-11-03
      • 2020-11-19
      • 1970-01-01
      • 1970-01-01
      • 2022-07-05
      • 1970-01-01
      • 2021-12-04
      相关资源
      最近更新 更多