【问题标题】:How to tune spark job on EMR to write huge data quickly on S3如何在 EMR 上调整 Spark 作业以在 S3 上快速写入大量数据
【发布时间】:2018-04-03 12:17:06
【问题描述】:

我有一个 spark 工作,我在两个数据帧之间进行外部连接。 第一个数据帧的大小为 260 GB,文件格式为文本文件,分为 2200 个文件,第二个数据帧的大小为 2GB。 然后将大约 260 GB 的数据帧输出写入 S3 需要很长时间,超过 2 小时后我取消了,因为我在 EMR 上发生了很大变化。

这是我的集群信息。

emr-5.9.0
Master:    m3.2xlarge
Core:      r4.16xlarge   10 machines (each machine has 64 vCore, 488 GiB memory,EBS Storage:100 GiB)

这是我正在设置的集群配置

capacity-scheduler  yarn.scheduler.capacity.resource-calculator :org.apache.hadoop.yarn.util.resource.DominantResourceCalculator
emrfs-site  fs.s3.maxConnections:   200
spark   maximizeResourceAllocation: true
spark-defaults  spark.dynamicAllocation.enabled:    true

我也尝试过手动设置内存组件,如下所示,性能更好,但同样需要很长时间

--num-executors 60--conf spark.yarn.executor.memoryOverhead=9216 --executor-memory 72G --conf spark.yarn.driver.memoryOverhead=3072 --driver-memory 26G --executor-cores 10 --driver-cores 3 --conf spark.default.parallelism=1200

我没有使用默认分区将数据保存到 S3 中。

添加有关作业和查询计划的所有详细信息,以便于理解。

真正的原因是分区。这需要大部分时间。 因为我有 2K 文件所以如果我使用像 200 这样的重新分区输出 文件以十万为单位,然后在 spark 中再次加载并不好 故事。

在下图中我不知道为什么在项目之后再次调用排序

在下面的图像 GC 对我来说太高了.. oi 必须处理这个请建议如何处理?

以下是节点健康状态。此时数据正在保存到 S3 中,难怪我只能看到两个节点处于活动状态,而所有节点都处于空闲状态。

这是加载时的集群详细信息..此时我可以看到集群已充分利用,但在将数据保存到 S3 时,许多节点是空闲的。

最后,这是我执行 Join 然后保存到 S3 中的代码...

import org.apache.spark.sql.expressions._

          val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").desc)
          val latestForEachKey = df2resultTimestamp.withColumn("rank", row_number.over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

          val columnMap = latestForEachKey.columns.filter(c => c.endsWith("_1") & c != "FFAction|!|_1").map(c => c -> c.dropRight(2)) :+ ("FFAction|!|_1", "FFAction|!|")
          val exprs = columnMap.map(t => coalesce(col(s"${t._1}"), col(s"${t._2}")).as(s"${t._2}"))
          val exprsExtended = Array(col("uniqueFundamentalSet"), col("PeriodId"), col("SourceId"), col("StatementTypeCode"), col("StatementCurrencyId"), col("FinancialStatementLineItem_lineItemId")) ++ exprs

          //Joining both dara frame here
          val dfMainOutput = (dataMain.join(latestForEachKey, Seq("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId"), "outer") select (exprsExtended: _*)).filter(!$"FFAction|!|".contains("D|!|"))
          //Joing ends here

          val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition", $"PartitionYear", $"PartitionStatement", concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").filter(_ != "PartitionStatement").map(c => col(c)): _*).as("concatenated"))

          val headerColumn = dataHeader.columns.toSeq

          val headerFinal = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

          val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", headerFinal)

          //  dfMainOutputFinalWithoutNull.repartition($"DataPartition", $"PartitionYear", $"PartitionStatement")
  .write
  .partitionBy("DataPartition", "PartitionYear", "PartitionStatement")
  .format("csv")
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "bzip2")
  .save(outputFileURL)

【问题讨论】:

  • 从 Web UI 显示逻辑计划。做截图让我/人们对代码的作用有所了解。谢谢。
  • 我从 Spark 的 Web UI 询问了有关查询的物理计划。这应该有助于我了解您的查询到底在做什么。
  • @JacekLaskowski 哦,好吧..我能在 Gangelia 看到那些吗?
  • 不这么认为。从未使用过 Ganglia(我的理解是它不会给你计划)。
  • @JacekLaskowski 我已经添加了所有的 SQL 计划和详细信息..你能看一下吗

标签: apache-spark-sql spark-dataframe hadoop2 amazon-emr


【解决方案1】:

您正在运行五个 c3.4large EC2 实例,每个实例都有 30gb 的 RAM。所以总共只有 150GB,比要加入的 >200GB 数据框小得多。因此有很多磁盘溢出。也许您可以启动 r 类型的 EC2 实例(内存优化与计算优化的 c 类型相对),看看是否有性能改进。

【讨论】:

  • 即使我使用 r4.4xlarge 也需要几乎相同的时间...使用的 V 核心仍然是 1
  • @SUDARSHAN 将此属性添加到您的集群配置中 [{"classification":"spark","properties":{"maximizeResourceAllocation":"true"}}]
  • 你有很多数据洗牌。也许当您最初编写原始数据时,以与在代码中重新分区相同的方式编写它们@SUDARSHAN
【解决方案2】:

S3 是对象存储而不是文件系统,因此最终一致性、非原子重命名操作会产生问题,即每次执行程序写入作业结果时,他们每个人都会写入外部临时目录必须写入文件的主目录(在 S3 上),一旦完成所有执行程序,就会进行重命名以获得原子排他性。在像 hdfs 这样的标准文件系统中这一切都很好,其中重命名是瞬时的,但在像 S3 这样的对象存储上,这不利于 S3 上的重命名以 6MB/s 的速度完成。

要克服上述问题,请确保设置以下两个 conf 参数

1) spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2

对于该参数的默认值,即 1,commitTask 将任务生成的数据从任务临时目录移动到作业临时目录,当所有任务完成后,commitJob 将数据从作业临时目录移动到最终目的地。因为驱动在做commitJob的工作,对于S3来说,这个操作可能需要很长时间。用户可能经常认为他/她的手机“挂起”。但是,当mapreduce.fileoutputcommitter.algorithm.version的值为2时,commitTask会将一个task产生的数据直接移动到最终的目的地,commitJob基本上是一个no-op。

2) spark.speculation=false

如果此参数设置为true,那么如果一个或多个任务在一个阶段运行缓慢,它们将被重新启动。如上所述,通过 spark 作业在 S3 上的写入操作非常慢,因此我们可以看到随着输出数据大小的增加,许多任务会重新启动。

这与最终一致性(将文件从临时目录移动到主数据目录时)可能会导致 FileOutputCommitter 进入死锁,因此作业可能会失败。

或者

您可以先将输出写入 EMR 上的本地 HDFS,然后使用 hadoop distcp 命令将数据移动到 S3。这大大提高了整体输出速度。但是,您的 EMR 节点上需要足够的 EBS 存储空间,以确保您的所有输出数据都适合。

此外,您可以将输出数据写入 ORC 格式,这将大大压缩输出大小。

参考:

https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98

【讨论】:

  • 苛刻,这只是对 Subhojit Banerjee 2016 年论文的一些更改的剪切和粘贴,诸如“性能”之类的字眼就暴露了。请在撰写答案时进行自己的研究。 medium.com/@subhojit20_27731/….
  • @SteveLoughran 感谢分享链接。只是我遇到了类似的性能问题,我从多篇文章中复制了解释,并为自己形成了一个结论和学习。上述解决方案解决了我的问题。我同意其中一些观点来自您提到的文章,但不是所有观点,而且我没有所有这些观点的 URL。 :-)
  • 好吧,我认为你应该给他荣誉。整个第一段和关于 6MB/s 的讨论都出自他之手。
猜你喜欢
  • 2018-07-01
  • 2017-03-16
  • 2020-04-24
  • 2021-12-11
  • 2018-02-16
  • 1970-01-01
  • 2019-06-06
  • 1970-01-01
  • 2018-09-19
相关资源
最近更新 更多