【发布时间】:2019-07-08 14:58:42
【问题描述】:
我遇到了 Spark 应用程序的问题。这是我的代码的简化版本:
def main(args: Array[String]) {
// Initializing spark context
val sc = new SparkContext()
val nbExecutors = sc.getConf.getInt("spark.executor.instances", 3)
System.setProperty("spark.sql.shuffle.partitions", nbExecutors.toString)
// Getting files from TGZ archives
val archivesRDD: RDD[(String,PortableDataStream)] = utils.getFilesFromHDFSDirectory("/my/dir/*.tar.gz") // This returns an RDD of tuples containing (filename, inpustream)
val filesRDD: RDD[String] = archivesRDD.flatMap(tgzStream => {
logger.debug("Getting files from archive : "+tgzStream._1)
utils.getFilesFromTgzStream(tgzStream._2)
})
// We run the same process with 3 different "modes"
val modes = Seq("mode1", "mode2", "mode3")
// We cache the RDD before
val nb = filesRDD.cache().count()
logger.debug($nb + " files as input")
modes.map(mode => {
logger.debug("Processing files with mode : " + mode)
myProcessor.process(mode, filesRDD)
})
filesRDD.unpersist() // I tried with or without this
[...]
}
生成的日志是(例如以 3 个档案作为输入):
从存档中获取文件:a
从存档中获取文件:b
从存档中获取文件:c
3 个文件作为输入
使用模式处理文件:mode1
从存档中获取文件:a
从存档中获取文件:b
从存档中获取文件:c
使用模式处理文件:mode2
从存档中获取文件:a
从存档中获取文件:b
从存档中获取文件:c
使用模式处理文件:mode3
从存档中获取文件:a
从存档中获取文件:b
从存档中获取文件:c
我的 Spark 配置:
- 版本:1.6.2
- 执行器:20 x 2CPU x 8Go RAM
- 每个执行器的纱线开销内存:800Mo
- 驱动程序:1CPU x 8Go RAM
我从这些日志中了解到,文件提取执行了 4 次插入!这显然会导致我遇到堆空间问题和性能泄漏......
我做错了吗?
编辑:我也尝试使用modes.foreach(...) 代替地图,但没有任何改变......
【问题讨论】:
-
也许你应该尝试直接在你定义
filesRDD的地方缓存。 -
@astro_asz 我不确定你的意思
标签: scala performance apache-spark hadoop-yarn