【问题标题】:org.apache.spark.SparkException: Job aborted due to stage failure - OOM Exceptionorg.apache.spark.SparkException:作业因阶段失败而中止 - OOM 异常
【发布时间】:2018-03-18 04:53:57
【问题描述】:

在我的应用程序中,我正在使用如下所示的 spark 分区来获取 500 万行和 151 列的表,并将其持久化到 DISK_ONLY

   val query = "(select * from destinationlarge) as dest"
val options = Map(
  "url" -> "jdbc:mysql://IPADDRESS:3306/test?useSSL=false",
  "driver" -> "com.mysql.jdbc.Driver",
  "dbtable" -> query,
  "user" -> "root",
  "password" -> "root")

val destination = spark.read.options(options).jdbc(options("url"), options("dbtable"), "0", 1, 5, 4, new java.util.Properties()).rdd.map(_.mkString(",")).persist(StorageLevel.DISK_ONLY)

集群有 5 个数据节点和 1 个硬件配置 i3 4 核和 4 GB RAM 的名称节点,在执行一段时间后,其中一个执行程序死了并抛出以下错误

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, datanode5, executor 6): ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 139401 ms
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
at com.syntel.spark.sparkDVT$.main(sparkDVT.scala:68)
at com.syntel.spark.sparkDVT.main(sparkDVT.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:750)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

lowerbound=1,upperbound=5,number of partitions is 4 在此链接中建议 (https://www.dezyre.com/article/how-data-partitioning-in-spark-helps-achieve-more-parallelism/297) 内核总数等于分区数,即所有节点中的 4 个内核,因此 4 个分区。

spark-submit

spark-submit --class "com.syntel.spark.sparkDVT" --master yarn --jars --executor-memory 512m --executor-cores 1 --num-executors 5 /root/sparkdvtmysql_2.11-1.0.jar

如果我错了,请纠正我

谢谢

【问题讨论】:

  • 这就是你所做的一切吗? mappersist 都是转换,而不是操作,我希望不会发生任何执行。
  • @Tim,实际上没有我有一组操作,如val source_primary_key = source.map(rec => (rec.split(",")(0), rec)) source_primary_key.persist(StorageLevel.DISK_ONLY) val extra_in_source = source_primary_key.subtractByKey(destination_primary_key) var pureextinsrc = extra_in_source.count() extra_in_source.cache()等等,但在此之前它在我从数据库中获取整个表时抛出内存异常,这是最好的方法我可以在哪里进行分区将数据保存在内存中,或者我应该将其持久化()到磁盘
  • (datanode5, executor 6): ExecutorLostFailure (executor 6 exited caused by one of the running tasks).. 在确认它是OOM异常之前,您是否验证了datanode5中的executor 6日志?

标签: scala apache-spark apache-spark-sql


【解决方案1】:

我建议您按原样使用 DataFame(在 Spark 2.0 中,即 DataSet[Row]),因为 DataSet 使用编码器,因此它比 RDD 占用更少的内存

val destination = spark.read
    .options(options)
    .format("jdbc")
    .load()

如果你想通过分隔符连接列,你可以使用concat_ws() - example here

destination
  .withColumn("column", concat_ws(", ", 
     destination.columns.map(destination.col(_)).toSeq : _*))
  .select("id, column") // id will be used for subtraction with other df
  .persist(StorageLevel.DISK_ONLY)

查看this SO post - Comaparing RDD/DF/DS,了解 Dataset 与 RDD 的不同之处及其优势。

这可能无法完全回答您的问题。我将根据我的评论回复更新 asnwer

【讨论】:

猜你喜欢
  • 2020-11-07
  • 2019-12-25
  • 1970-01-01
  • 2020-10-04
  • 2019-08-29
  • 1970-01-01
  • 2022-08-03
  • 2023-03-20
  • 2019-09-24
相关资源
最近更新 更多