【问题标题】:Spark: Out Of Memory Error when I save to HDFSSpark:保存到 HDFS 时出现内存不足错误
【发布时间】:2015-06-14 22:08:39
【问题描述】:

我在将大数据保存到 hdfs 时遇到 OOME

val accumulableCollection = sc.accumulableCollection(ArrayBuffer[String]())
val rdd = textfile.filter(row => {
    if (row.endsWith(",")) {
        accumulableCollection += row
        false
    } else if (row.length < 100) {
        accumulableCollection += row
        false
    }
    valid
})
rdd.cache()
val rdd2 = rdd.map(_.split(","))
val rdd3 = rdd2.filter(row => {
    var valid = true
    for((k,v) <- fieldsMap if valid ) {
        if (StringUtils.isBlank(row(k)) || "NULL".equalsIgnoreCase(row(k))) {
            accumulableCollection += row.mkString(",")
            valid = false
        }
    }
    valid
})
sc.parallelize(accumulableCollection.value).saveAsTextFile(hdfsPath)

我在 spark-submit 中使用这个:

--num-executors 2 --driver-memory 1G --executor-memory 1G --executor-cores 2

这是日志的输出:

15/04/12 18:46:49 WARN scheduler.TaskSetManager: Stage 4 contains a task of very large size (37528 KB). The maximum recommended task size is 100 KB.
15/04/12 18:46:49 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 8, worker4, PROCESS_LOCAL, 38429279 bytes)
15/04/12 18:46:49 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 9, worker3, PROCESS_LOCAL, 38456846 bytes)
15/04/12 18:46:50 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 4.0 (TID 10, worker4, PROCESS_LOCAL, 38426488 bytes)
15/04/12 18:46:51 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 4.0 (TID 11, worker3, PROCESS_LOCAL, 38445061 bytes)
15/04/12 18:46:51 INFO cluster.YarnClusterScheduler: Cancelling stage 4
15/04/12 18:46:51 INFO cluster.YarnClusterScheduler: Stage 4 was cancelled
15/04/12 18:46:51 INFO scheduler.DAGScheduler: Job 4 failed: saveAsTextFile at WriteToHdfs.scala:87, took 5.713617 s
15/04/12 18:46:51 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due to stage failure: Serialized task 8:0 was 38617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.)
Exception in thread "Driver" org.apache.spark.SparkException: Job aborted due to stage failure: **Serialized task 8:0 was 30617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes)** - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.

序列化任务 8:0 为 30617206 字节,超过允许的最大值:spark.akka.frameSize (10485760 字节) --- (1) 什么是 30MB 序列化任务?

考虑对大值使用广播变量。 --- (2) 广播变量应该是什么? rdd2?还是 accumulableCollection,因为这是我要写入 HDFS 的内容?

当我增加 frameSize 时,现在的错误是:java.lang.OutOfMemoryError: Java heap space,所以我必须将驱动程序内存和执行程序内存增加到 2G 才能工作。如果 accumulableCollection.value.length 是 500,000 我需要使用 3G。这正常吗?

该文件只有 146MB,包含 200,000 行(对于 2G 内存)。 (在 HDFS 中它分为 2 个分区,每个分区包含 73MB)

【问题讨论】:

  • 有时这是由配置问题引起的。你是如何初始化你的火花上下文的?你试过在本地提交吗?您是否尝试在连接到集群/独立管理器的交互式 shell 中运行相同的代码?
  • 我尝试通过 --master yarn-cluster 和 local[2] 提交,但结果相同。我正在使用 val sc = new SparkContext(new SparkConf())
  • 你是把整个数据集都放到内存里了吗?

标签: hadoop apache-spark hdfs


【解决方案1】:

Spark 中的核心编程抽象是一个 RDD,you can create them in two ways

(1) 并行化驱动程序中的现有集合,或 (2) 引用外部存储系统中的数据集,例如共享 文件系统、HDFS、HBase 或任何提供 Hadoop 的数据源 输入格式。

parallelize() 方法 (1) 要求您将整个数据集保存在一台机器的内存中(第 26 页学习火花)。

方法(2),简称External Datasets,适用于大文件。

以下行使用accumulableCollection.value 的内容创建一个RDD,并要求它适合单台机器:

sc.parallelize(accumulableCollection.value)

缓存 RDD 时也可能会超出内存:

rdd.cache()

这意味着整个textfile RDD 都存储在内存中。您很可能不想这样做。请参阅Spark documentation,了解为数据选择缓存级别的建议。

【讨论】:

  • 嗨 Myles,我删除了 rdd.cache(虽然我需要它来避免 accumulableCollection 中的重复值),但我仍然得到相同的 OOM 错误。我还更新了抛出的错误。我已将 spark.storage.memoryFraction 设置为 0.4 btw。
  • 在你并行化 accumulableCollection 时一定会发生,这是有道理的。我会更新我的解决方案。
【解决方案2】:

它的意思几乎就是它所说的。您正在尝试序列化一个非常大的对象。您可能应该重写您的代码以不这样做。

例如,我不清楚您为什么要尝试更新可累积集合,并在 filter 中这样做,它甚至可以执行多次。然后您缓存 RDD,但您已经尝试在驱动程序上有一个副本?然后你将其他值添加到本地集合中,然后再次将其转换为 RDD?

为什么是 accumulableCollection?只需对 RDD 进行操作。这里有很多冗余。

【讨论】:

  • 嗨 Sean,accumulableCollection 用于 RDD 中的无效行。这就是为什么它在过滤器中。我更新了代码以显示这一点。过滤后的有效行(rdd3)将保存在镶木地板中。而提取的行/无效将保存在 HDFS 中。我缓存了 rdd 以防止 accumulableCollection 中的重复,因为它执行了不止一次。
  • 另外,你的意思是 accumulableCollection.value 在被保存到 hdfs 之前被序列化了吗?我尝试使用 kryo 序列化程序(通过添加到 spark conf),但结果是一样的。
  • 这更有意义并且排除了很多。 (顺便说一句,你的代码中不需要这个var valid,只需以falsetrue 结尾...)我认为问题是你最后有一个巨大的集合,你试图把进入记忆。直接写入HDFS即可。无需使用 Spark。
  • 是的,将写入 HDFS 的 accumulableCollection 的最大大小为 840MB 或 1.3M 行。在这种情况下,我只写了 146MB 的数据。如何在不使用 Spark 的情况下将其直接写入 HDFS?这里没有任何解决方法可以将大数据保存到 HDFS 吗?
  • 解决方法是什么?使用 Spark 访问 HDFS 有点多余。只需直接使用 HDFS API。我想你的一个问题是你有一大堆错误要处理,但把它当作可以复制回驱动程序内存的“小数据”。根据谓词将 RDD 拆分为两个 RDD 并不是一个好方法,但是您可以缓存父级,并过滤 in 一次好的行,然后过滤 in 坏行下一次,并以两者的 RDD 结束。
猜你喜欢
  • 2021-04-23
  • 2014-10-13
  • 2022-01-12
  • 2013-12-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-05-15
  • 1970-01-01
相关资源
最近更新 更多