【发布时间】:2022-10-24 10:28:00
【问题描述】:
我编写了一个小的 PySpark 代码来测试 spark AQE 的工作,并且似乎没有根据传递给它的参数合并分区。
以下是我的代码:
df = spark.read.format("csv").option("header", "true").load(<path to my csv file>)
spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions","50")
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "60")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes","200000")
spark.conf.set("spark.sql.adaptive.coalescePartitions.parallelismFirst","false")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "200000")
df3 = df.groupby("Loan title").agg({"*":"count"}).withColumnRenamed('count(1)','cnt')
df3.show()
该文件约为 1.8 Gb,被读入 14 个分区,其随机写入约为 1.8MB,我已将advisoryPartitionSizeInBytes 和 minPartitionSize 设置为 200 kb,因此我预计合并分区的数量约为 9(1M/200kb)。
但是即使我们在最终计划中看到 AQE shuffle 读取的 8 个合并分区,最终阶段的任务数仍然是 1,这令人困惑。
请在下面找到火花 ui 图片:
谁能帮我弄清楚这种行为?提前致谢!!
【问题讨论】:
标签: apache-spark pyspark apache-spark-sql