【问题标题】:Why so many tasks in my spark job? Getting 200 Tasks By Default为什么我的 Spark 工作中有这么多任务?默认获取 200 个任务
【发布时间】:2016-10-12 00:51:48
【问题描述】:

我有一个 spark 作业,它从 hdfs 获取一个包含 8 条记录的文件,进行简单的聚合并将其保存回 hdfs。我注意到当我这样做时有数百个任务。

我也不确定为什么会有多个工作?我认为一份工作更像是发生了一件事情。我可以推测原因——但我的理解是,在这段代码内部,它应该是一项工作,它应该被分解为多个阶段,而不是多个工作。为什么不把它分解成阶段,怎么分解成工作?

就 200 多个任务而言,由于数据量和节点数量微乎其微,所以在只有一个聚合和一对聚合的情况下,每行数据有 25 个任务是没有意义的的过滤器。为什么每个原子操作的每个分区不只有一个任务?

这里是相关的 scala 代码 -

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object TestProj {object TestProj {
  def main(args: Array[String]) {

    /* set the application name in the SparkConf object */
    val appConf = new SparkConf().setAppName("Test Proj")

    /* env settings that I don't need to set in REPL*/
    val sc = new SparkContext(appConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")

     /*the below rdd will have schema defined in Record class*/
     val rddCase =  sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
      .map(x=>x.split(" "))    //file record into array of strings based spaces
      .map(x=>Record(
        x(0).toInt,
        x(1).asInstanceOf[String],
        x(2).asInstanceOf[String],
        x(3).toInt))


    /* the below dataframe groups on first letter of first name and counts it*/
    val aggDF = rddCase.toDF()
      .groupBy($"firstName".substr(1,1).alias("firstLetter"))
      .count
      .orderBy($"firstLetter")

    /* save to hdfs*/ 
 aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg")

  }

    case class Record(id: Int
      , firstName: String
      , lastName: String
      , quantity:Int)

}

以下是点击应用程序后的屏幕截图

以下是查看 id 0 的特定“工作”时显示的阶段

以下是点击包含 200 多个任务的舞台时的第一部分屏幕

这是舞台内屏幕的第二部分

以下是点击“执行者”选项卡后

根据要求,这里是 Job ID 1 的阶段

以下是作业 ID 1 中包含 200 个任务的阶段的详细信息

【问题讨论】:

    标签: scala apache-spark hadoop apache-spark-sql task


    【解决方案1】:

    这是一道经典的 Spark 问题。

    用于读取的两个任务(第二个图中的阶段 ID 0)是设置为 2 的defaultMinPartitions 设置。您可以通过读取 REPL sc.defaultMinPartitions 中的值来获取此参数。它也应该在“环境”选项卡下的 Spark UI 中可见。

    您可以查看来自 GitHub 的 code 以了解正在发生的事情。如果您想在读取时使用更多分区,只需将其添加为参数,例如 sc.textFile("a.txt", 20)

    现在有趣的部分来自第二阶段的 200 个分区(第二个图中的阶段 ID 1)。好吧,每次 shuffle 时,Spark 都需要决定 shuffle RDD 有多少个分区。可以想象,默认是200。

    您可以使用以下方法更改:

    sqlContext.setConf("spark.sql.shuffle.partitions", "4”)
    

    如果您使用此配置运行代码,您将看到 200 个分区将不再存在。如何设置这个参数是一门艺术。也许选择你拥有的核心数量的 2 倍(或其他)。

    我认为 Spark 2.0 有一种方法可以自动推断 shuffle RDD 的最佳分区数。期待!

    最后,您获得的作业数量与优化后的 Dataframe 代码产生了多少 RDD 操作 有关。如果您阅读 Spark 规范,它会说每个 RDD 操作都会触发一项工作。当您的操作涉及 Dataframe 或 SparkSQL 时,Catalyst 优化器将找出执行计划并生成一些基于 RDD 的代码来执行它。很难确切地说为什么它在您的情况下使用两个操作。您可能需要查看优化的查询计划以确切了解正在执行的操作。

    【讨论】:

    • 谢谢你!我会立即进行检查。多份工作怎么办?为什么有两个工作?
    • 你有 Job ID 1 阶段的屏幕吗?
    • 我将它们添加到 OP
    • 非常感谢!在你走之前,我是否应该通过执行一些不同的代码来采取第二个行动?这会是您希望做的优化吗?
    • 老实说,我认为如果他们的优化器这样做可能是有原因的。你的代码看起来不错。尽可能多地使用数据帧代码并避免使用 RDD。你应该没事:)
    【解决方案2】:

    我也有类似的问题。但在我的场景中,我正在并行化的集合的元素少于 Spark 调度的任务数量(导致 spark 有时表现异常)。使用强制分区号我能够解决这个问题。

    原来是这样的:

    collection = range(10) # In the real scenario it was a complex collection
    sc.parallelize(collection).map(lambda e: e + 1) # also a more complex operation in the real scenario
    

    然后,我在 Spark 日志中看到:

    INFO YarnClusterScheduler: Adding task set 0.0 with 512 tasks
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-10-02
      • 2017-03-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多