【发布时间】:2018-04-03 12:17:06
【问题描述】:
我有一个 spark 工作,我在两个数据帧之间进行外部连接。 第一个数据帧的大小为 260 GB,文件格式为文本文件,分为 2200 个文件,第二个数据帧的大小为 2GB。 然后将大约 260 GB 的数据帧输出写入 S3 需要很长时间,超过 2 小时后我取消了,因为我在 EMR 上发生了很大变化。
这是我的集群信息。
emr-5.9.0
Master: m3.2xlarge
Core: r4.16xlarge 10 machines (each machine has 64 vCore, 488 GiB memory,EBS Storage:100 GiB)
这是我正在设置的集群配置
capacity-scheduler yarn.scheduler.capacity.resource-calculator :org.apache.hadoop.yarn.util.resource.DominantResourceCalculator
emrfs-site fs.s3.maxConnections: 200
spark maximizeResourceAllocation: true
spark-defaults spark.dynamicAllocation.enabled: true
我也尝试过手动设置内存组件,如下所示,性能更好,但同样需要很长时间
--num-executors 60--conf spark.yarn.executor.memoryOverhead=9216 --executor-memory 72G --conf spark.yarn.driver.memoryOverhead=3072 --driver-memory 26G --executor-cores 10 --driver-cores 3 --conf spark.default.parallelism=1200
我没有使用默认分区将数据保存到 S3 中。
添加有关作业和查询计划的所有详细信息,以便于理解。
真正的原因是分区。这需要大部分时间。 因为我有 2K 文件所以如果我使用像 200 这样的重新分区输出 文件以十万为单位,然后在 spark 中再次加载并不好 故事。
在下面的图像 GC 对我来说太高了.. oi 必须处理这个请建议如何处理?
以下是节点健康状态。此时数据正在保存到 S3 中,难怪我只能看到两个节点处于活动状态,而所有节点都处于空闲状态。
这是加载时的集群详细信息..此时我可以看到集群已充分利用,但在将数据保存到 S3 时,许多节点是空闲的。
最后,这是我执行 Join 然后保存到 S3 中的代码...
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").desc)
val latestForEachKey = df2resultTimestamp.withColumn("rank", row_number.over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
val columnMap = latestForEachKey.columns.filter(c => c.endsWith("_1") & c != "FFAction|!|_1").map(c => c -> c.dropRight(2)) :+ ("FFAction|!|_1", "FFAction|!|")
val exprs = columnMap.map(t => coalesce(col(s"${t._1}"), col(s"${t._2}")).as(s"${t._2}"))
val exprsExtended = Array(col("uniqueFundamentalSet"), col("PeriodId"), col("SourceId"), col("StatementTypeCode"), col("StatementCurrencyId"), col("FinancialStatementLineItem_lineItemId")) ++ exprs
//Joining both dara frame here
val dfMainOutput = (dataMain.join(latestForEachKey, Seq("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId"), "outer") select (exprsExtended: _*)).filter(!$"FFAction|!|".contains("D|!|"))
//Joing ends here
val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition", $"PartitionYear", $"PartitionStatement", concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").filter(_ != "PartitionStatement").map(c => col(c)): _*).as("concatenated"))
val headerColumn = dataHeader.columns.toSeq
val headerFinal = headerColumn.mkString("", "|^|", "|!|").dropRight(3)
val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", headerFinal)
// dfMainOutputFinalWithoutNull.repartition($"DataPartition", $"PartitionYear", $"PartitionStatement")
.write
.partitionBy("DataPartition", "PartitionYear", "PartitionStatement")
.format("csv")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.option("nullValue", "")
.option("delimiter", "\t")
.option("quote", "\u0000")
.option("header", "true")
.option("codec", "bzip2")
.save(outputFileURL)
【问题讨论】:
-
从 Web UI 显示逻辑计划。做截图让我/人们对代码的作用有所了解。谢谢。
-
我从 Spark 的 Web UI 询问了有关查询的物理计划。这应该有助于我了解您的查询到底在做什么。
-
@JacekLaskowski 哦,好吧..我能在 Gangelia 看到那些吗?
-
不这么认为。从未使用过 Ganglia(我的理解是它不会给你计划)。
-
@JacekLaskowski 我已经添加了所有的 SQL 计划和详细信息..你能看一下吗
标签: apache-spark-sql spark-dataframe hadoop2 amazon-emr