【问题标题】:Spark cached RDD is calculated n timesSpark缓存RDD计算n次
【发布时间】:2019-07-08 14:58:42
【问题描述】:

我遇到了 Spark 应用程序的问题。这是我的代码的简化版本:

def main(args: Array[String]) {
    // Initializing spark context
    val sc = new SparkContext()
    val nbExecutors = sc.getConf.getInt("spark.executor.instances", 3)
    System.setProperty("spark.sql.shuffle.partitions", nbExecutors.toString)

    // Getting files from TGZ archives
    val archivesRDD: RDD[(String,PortableDataStream)] = utils.getFilesFromHDFSDirectory("/my/dir/*.tar.gz") // This returns an RDD of tuples containing (filename, inpustream)
    val filesRDD: RDD[String] = archivesRDD.flatMap(tgzStream => {
        logger.debug("Getting files from archive : "+tgzStream._1)
        utils.getFilesFromTgzStream(tgzStream._2)
    })

    // We run the same process with 3 different "modes"
    val modes = Seq("mode1", "mode2", "mode3")

    // We cache the RDD before
    val nb = filesRDD.cache().count()
    logger.debug($nb + " files as input")

    modes.map(mode => {
        logger.debug("Processing files with mode : " + mode)
        myProcessor.process(mode, filesRDD)
    })

    filesRDD.unpersist() // I tried with or without this

    [...]
}

生成的日志是(例如以 3 个档案作为输入):

从存档中获取文件:a

从存档中获取文件:b

从存档中获取文件:c

3 个文件作为输入

使用模式处理文件:mode1

从存档中获取文件:a

从存档中获取文件:b

从存档中获取文件:c

使用模式处理文件:mode2

从存档中获取文件:a

从存档中获取文件:b

从存档中获取文件:c

使用模式处理文件:mode3

从存档中获取文件:a

从存档中获取文件:b

从存档中获取文件:c

我的 Spark 配置:

  • 版本:1.6.2
  • 执行器:20 x 2CPU x 8Go RAM
  • 每个执行器的纱线开销内存:800Mo
  • 驱动程序:1CPU x 8Go RAM

我从这些日志中了解到,文件提取执行了 4 次插入!这显然会导致我遇到堆空间问题和性能泄漏......

我做错了吗?

编辑:我也尝试使用modes.foreach(...) 代替地图,但没有任何改变......

【问题讨论】:

  • 也许你应该尝试直接在你定义filesRDD的地方缓存。
  • @astro_asz 我不确定你的意思

标签: scala performance apache-spark hadoop-yarn


【解决方案1】:

您是否尝试过将您的 modes.map 结果传递给 List 构造函数(即 List(modes.map{ /*...*/}))?有时(我不确定何时)Scala 集合会延迟评估映射,因此如果直到 spark 删除缓存后才评估这些映射,则必须重新计算。

【讨论】:

  • 谢谢,但这个修复程序仍然无法正常工作
【解决方案2】:

好的,经过大量测试,我终于解决了这个问题。实际上有两个问题:

  1. 我低估了输入数据的大小:如果 RDD 太大而无法完全存储在总内存的 60% 中,则 Spark 的 cachepersist 函数效率低下,我知道,但我认为我的输入数据并没有那么大,但实际上我的 RDD 是 80GB。但是我 60% 的内存(即 160GB)仍然超过 80GB,那么发生了什么?回答第 2 题...

  2. 我的分区太大: 在我的代码中,我的 RDD 的分区数设置为 100,所以我有 100 个分区,每个 1.6GB。问题是我的数据是由几十个兆的字符串组成的,所以我的分区没有满,10GB 的已用内存实际上只包含 7 或 8GB 的​​真实数据。

为了解决这些问题,我不得不使用persist(StorageLevel.MEMORY_SER),这会增加计算时间但显着减少内存使用(according to this benchmark)并将分区号设置为 1000(根据 Spark 文档,建议分区为 ~128MB)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-09-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-05
    • 1970-01-01
    相关资源
    最近更新 更多