【发布时间】:2016-10-12 00:51:48
【问题描述】:
我有一个 spark 作业,它从 hdfs 获取一个包含 8 条记录的文件,进行简单的聚合并将其保存回 hdfs。我注意到当我这样做时有数百个任务。
我也不确定为什么会有多个工作?我认为一份工作更像是发生了一件事情。我可以推测原因——但我的理解是,在这段代码内部,它应该是一项工作,它应该被分解为多个阶段,而不是多个工作。为什么不把它分解成阶段,怎么分解成工作?
就 200 多个任务而言,由于数据量和节点数量微乎其微,所以在只有一个聚合和一对聚合的情况下,每行数据有 25 个任务是没有意义的的过滤器。为什么每个原子操作的每个分区不只有一个任务?
这里是相关的 scala 代码 -
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object TestProj {object TestProj {
def main(args: Array[String]) {
/* set the application name in the SparkConf object */
val appConf = new SparkConf().setAppName("Test Proj")
/* env settings that I don't need to set in REPL*/
val sc = new SparkContext(appConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
/*the below rdd will have schema defined in Record class*/
val rddCase = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
.map(x=>x.split(" ")) //file record into array of strings based spaces
.map(x=>Record(
x(0).toInt,
x(1).asInstanceOf[String],
x(2).asInstanceOf[String],
x(3).toInt))
/* the below dataframe groups on first letter of first name and counts it*/
val aggDF = rddCase.toDF()
.groupBy($"firstName".substr(1,1).alias("firstLetter"))
.count
.orderBy($"firstLetter")
/* save to hdfs*/
aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg")
}
case class Record(id: Int
, firstName: String
, lastName: String
, quantity:Int)
}
以下是点击包含 200 多个任务的舞台时的第一部分屏幕
根据要求,这里是 Job ID 1 的阶段
以下是作业 ID 1 中包含 200 个任务的阶段的详细信息
【问题讨论】:
标签: scala apache-spark hadoop apache-spark-sql task