【问题标题】:How to tune the spark application in order to avoid OOM exception如何调整 spark 应用程序以避免 OOM 异常
【发布时间】:2018-06-13 01:39:52
【问题描述】:

我使用 Spark 2.0.2。

我正在尝试运行一个 Spark 应用程序,该应用程序对已创建的模型进行预测。

集群信息:m4.2xlarge 16 vCPU,32 GiB 内存,仅 EBS 存储 EBS 存储:1000 GiB

根据here 提出的建议,我制定了Google-Spreadsheet 来计算调整参数。

无论我尝试什么,我都会遇到以下 2 个异常:

  1. 容器因超出内存限制而被 YARN 杀死。使用了 10.0 GB 的 10 GB 物理内存。考虑提升 spark.yarn.executor.memoryOverhead。
  2. 执行器心跳在 159126 毫秒后超时

下面是我要执行的代码

val allGears = sc.textFile(allGearsFilePath)
val allUsers = sc.textFile(allUsersFilePath)
val allUserItems = allUsers.cartesian(allGears).map{ case(x,y) => (x.toInt, y.toInt)}
allUserItems.cache()


val gearPurchased = sc.textFile(gearPurchaseRating)
val gearAddedToCart = sc.textFile(gearAddToCartRating)
val gearShoppingUserToItem = gearPurchased.map(_.split(',') match   { case Array(user, item, rate) => (user.toInt, item.toInt) })
gearShoppingUserToItem.cache()
val allUserItemToGearShoppingUnion = allUserItems.union(gearShoppingUserToItem)
val allUserItemToGearShoppingIntersection = allUserItems.intersection(gearShoppingUserToItem)
val FinalSubtraction = allUserItems.subtract(gearShoppingUserToItem)
val nonPurchasedGears = FinalSubtraction
nonPurchasedGears.cache()
allUserItems.unpersist()
gearShoppingUserToItem.unpersist()
val out = model.predict(nonPurchasedGears)

当我尝试预测用户可以购买哪些装备时出现异常。

下面是我运行的 spark-submit 命令

spark-submit --jars jedis-2.7.2.jar,commons-pool2-2.3.jar,spark-redis-0.3.2.jar,SparkHBase.jar,recommendcontentslib_2.11-1.0.jar --class org.digitaljuice.itemrecommender.RecommendGears --master yarn --driver-memory 2g --num-executors 5 --executor-memory 9g --executor-cores 5 --conf spark.yarn.executor.memoryOverhead=1024 recommendersystem_2.11-0.0.1.jar /work/output/gearpurchaserating/part-00000 /work/output/gearaddtocartrating/part-00000 /work/output/allGears/part-00000 /work/output/allAccounts/part-00000 /work/allaccounts/acc_toacc/part-m-00000 /work/Recommendations/ /work/TrainingModel

如何调整应用程序以使其运行并做出预测? 我尝试了各种方法,但似乎没有任何效果,所以我猜我没有正确调整应用程序。请帮忙。

谢谢

【问题讨论】:

  • 能不能把allUserItems.cache()改成allUserItems.cache().count()来触发缓存权就行了? gearShoppingUserToItem.cache() 也一样。这会让你知道什么/什么时候失败。此外,你为什么使用 RDD API?已经是 21 世纪了:)
  • 我看到您有几个 cache 调用,并且由于 cache 坚持到内存中,您很快就会遇到 OOM 问题。当您看到错误时,正在评估 DAG 中的哪个阶段?
  • 哦,是的,您运行的是哪个版本的 Spark?
  • 我是说它们可能是一个陷阱(通常,我会推荐.persist(StorageLevel.MEMORY_AND_DISK),除非您绝对确定数据适合内存)。但是,您还在做其他事情可能会导致 OOM,这就是为什么我问您在哪个阶段看到错误。
  • 就在我尝试预测时,在这条线上 val out = model.predict(nonPurchasedGears)

标签: scala apache-spark rdd apache-spark-mllib


【解决方案1】:

好的,与其继续评论,我将直接进入一个有目的的解决方案,这也需要对您的代码进行一些清理。

val allGears = spark.read.csv(allGearsFilePath)
val allUsers = spark.read.csv(allUsersFilePath)
val allUserItems = allUsers.crossJoin(allGears).map{case Row(x: String,y: String) => (x.toInt, y.toInt)}.persist(StorageLevel.MEMORY_AND_DISK)

val gearPurchased = spark.read.csv(gearPurchaseRating)

val gearShoppingUserToItem = gearPurchased.map{case Row(x: String,y: String) => (x.toInt, y.toInt)}.persist(StorageLevel.MEMORY_AND_DISK)
val nonPurchasedGears = allUserItems.except(gearShoppingUserToItem).cache()

val gearAddedToCart = spark.read.csv(gearAddToCartRating) // NOT USED
val allUserItemToGearShoppingUnion = allUserItems.union(gearShoppingUserToItem) // NOT USED
val allUserItemToGearShoppingIntersection = allUserItems.intersect(gearShoppingUserToItem) // NOT USED

allUserItems.unpersist()
gearShoppingUserToItem.unpersist()

val out = model.predict(nonPurchasedGears)

您的示例中没有使用许多变量 - 我将它们留在那里以防您以后需要它们。如果您确实不需要需要它们,请将它们删除。 (此外,如果您删除它们,则没有理由缓存任何数据帧,您也可以简单地从代码中删除所有 persistcache。)

无论如何,回到问题 - 如果您仍然遇到 OOM,您可以尝试多种方法:

  • 增加memoryOverhead。在 Spark 2.x 中,堆外内存的使用增加了,您通常需要增加 memoryOverhead。尝试将其增加到 4096(请注意,您可能需要降低 --executor-memory,以免超出可用内存)。
  • 在执行except 之前将数据重新分区到更多分区中
  • 通过在调用 except 之前添加 allUserItems.countgearShoppingUserToItem.count 来强制评估和缓存持久数据帧。我知道这听起来很奇怪,但它会经常解决 OOM 问题并显着加快您的代码速度。

我希望这会有所帮助:)

【讨论】:

    猜你喜欢
    • 2017-11-19
    • 2020-03-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-07-18
    • 1970-01-01
    • 1970-01-01
    • 2017-02-10
    相关资源
    最近更新 更多