【发布时间】:2020-04-24 23:13:33
【问题描述】:
我在 EMR 上使用 Apache Spark 进行了大量 ETL。
我对获得良好性能所需的大部分调优相当满意,但我有一项工作似乎无法弄清楚。
基本上,我正在获取大约 1 TB 的镶木地板数据 - 分布在 S3 中的数万个文件中 - 并添加几列并将其写出,并按数据的日期属性之一进行分区 - 再次,镶木地板格式在 S3 中。
我是这样跑的:
spark-submit --conf spark.dynamicAllocation.enabled=true --num-executors 1149 --conf spark.driver.memoryOverhead=5120 --conf spark.executor.memoryOverhead=5120 --conf spark.driver.maxResultSize=2g --conf spark.sql.shuffle.partitions=1600 --conf spark.default.parallelism=1600 --executor-memory 19G --driver-memory 19G --executor-cores 3 --driver-cores 3 --class com.my.class path.to.jar <program args>
集群的大小是根据输入数据集的大小动态确定的,num-executors、spark.sql.shuffle.partitions和spark.default.parallelism参数是根据输入数据集的大小来计算的集群。
代码大致是这样的:
va df = (read from s3 and add a few columns like timestamp and source file name)
val dfPartitioned = df.coalesce(numPartitions)
val sqlDFProdDedup = spark.sql(s""" (query to dedup against prod data """);
sqlDFProdDedup.repartition($"partition_column")
.write.partitionBy("partition_column")
.mode(SaveMode.Append).parquet(outputPath)
当我查看神经节图时,我在重复数据删除逻辑运行和一些数据混洗时获得了巨大的资源峰值,但随后数据的实际写入只使用了一小部分资源并运行了几个小时.
我不认为主要问题是分区倾斜,因为数据应该公平地分布在所有分区中。
分区列本质上是一个月中的一天,因此每个作业通常只有 5-20 个分区,具体取决于输入数据集的跨度。每个分区通常包含大约 100 GB 的数据,包含 10-20 个 parquet 文件。
我正在设置 spark.sql.files.maxRecordsPerFile 来管理这些输出文件的大小。
所以,我最大的问题是:如何提高这里的性能?
仅仅添加资源似乎没有多大帮助。
我尝试让执行器更大(以减少混洗)并增加每个执行器的 CPU 数量,但这似乎并不重要。
提前致谢!
【问题讨论】:
-
一个后续说明:有问题的表有大约 80 行。其中 2 行是大字符串:一行大约 100 个字符,另一行大约 1000 个字符。根据经验,一旦采用(活泼的)镶木地板格式,如果没有这两列,数据就会小约 1/3。对于任何值得...
标签: scala apache-spark amazon-s3 amazon-emr