【发布时间】:2019-04-28 19:45:20
【问题描述】:
考虑到内存有限,我有一种感觉,spark 会自动从每个节点中删除 RDD。我想知道这个时间是可配置的吗? spark如何决定何时从内存中驱逐一个RDD
注意:我说的不是rdd.cache()
【问题讨论】:
标签: apache-spark rdd
考虑到内存有限,我有一种感觉,spark 会自动从每个节点中删除 RDD。我想知道这个时间是可配置的吗? spark如何决定何时从内存中驱逐一个RDD
注意:我说的不是rdd.cache()
【问题讨论】:
标签: apache-spark rdd
我想知道这个时间是否可配置? spark如何决定何时 从内存中驱逐一个 RDD
RDD 和其他对象一样是一个对象。如果您不持久化/缓存它,它将像托管语言下的任何其他对象一样,一旦没有指向它的活动根对象就会被收集。
正如@Jacek 所指出的,“如何”部分是一个名为ContextCleaner 的对象的责任。主要是想了解详情,this is what the cleaning method looks like:
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.foreach { ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
如果您想了解更多信息,我建议您浏览 Sparks 源代码,甚至更好,阅读名为 "Mastering Apache Spark" (This points to an explanation regarding ContextCleaner) 的 @Jacek 书籍
【讨论】:
rdd.map(_ * 2)。在这种情况下,spark 会将任务发送到分区,说做一个_ * 2 的映射并存储新的数据集。我假设这个新的映射数据集在该分区的内存中。是rdd被序列化并发送到每个分区并且rdd包含其中的数据集吗?
RDD 永远不会全部发送给执行者。它是由分区构成的。每个这样的分区都被发送到执行器进行处理。一旦不再使用该分区(即不是linage graph 中另一个RDD 的父级),它就可以被收集。检查点是截断连系图的一种方法。
一般来说,Yuval Itzchakov wrote“就像任何其他对象一样”,但是......(总是有“但是”,不是吗?)
在 Spark 中,这不是很明显,因为我们有 shuffle 块(在 Spark 管理的其他块中)。它们由运行在 executor 上的 BlockManager 管理。当驱动程序上的对象被从内存中逐出时,他们必须以某种方式得到通知,对吧?
这就是 ContextCleaner 出现的地方。它是 Spark 应用程序的垃圾收集器,负责在应用程序范围内清理 shuffle、RDD、广播、累加器和检查点 RDD,旨在减少长时间运行的数据密集型 Spark 应用程序的内存需求。
ContextCleaner 在驱动程序上运行。它在SparkContext 启动时创建并立即启动(并且spark.cleaner.referenceTracking Spark 属性已启用,默认情况下)。当SparkContext 停止时,它就停止了。
您可以通过使用jconsole 或jstack 转储 Spark 应用程序中的所有线程来查看它的工作情况。 ContextCleaner 使用守护进程 Spark Context Cleaner 线程来清理 RDD、shuffle 和广播状态。
您还可以通过为org.apache.spark.ContextCleaner 记录器启用INFO 或DEBUG 记录级别来查看其工作。只需将以下行添加到conf/log4j.properties:
log4j.logger.org.apache.spark.ContextCleaner=DEBUG
【讨论】:
衡量 GC 的影响
GC 调优的第一步是收集有关垃圾收集发生频率和 GC 花费时间的统计信息。这可以通过将 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 添加到 Java 选项来完成。 (有关将 Java 选项传递给 Spark 作业的信息,请参阅配置指南。)下次运行 Spark 作业时,您将看到每次垃圾回收发生时在工作人员日志中打印的消息。请注意,这些日志将位于您集群的工作节点上(在其工作目录中的标准输出文件中),而不是您的驱动程序。
高级 GC 调优
为了进一步调优垃圾回收,我们首先需要了解一些关于JVM中内存管理的基本信息:
Java Heap 空间分为 Young 和 Old 两个区域。新生代用于保存寿命较短的对象,而老年代用于保存寿命较长的对象。
Young 代进一步分为三个区域 [Eden, Survivor1, Survivor2]。
垃圾收集过程的简化描述:当 Eden 已满时,将在 Eden 上运行次要 GC,并将 Eden 和 Survivor1 中的活动对象复制到 Survivor2。幸存者区域被交换。如果对象足够老或 Survivor2 已满,则将其移至 Old。最后,当 Old 接近满时,会调用 full GC。
【讨论】:
根据弹性分布式数据集论文 -
我们的工作节点在内存中缓存 RDD 分区 Java 对象。我们在 RDD 的级别(即,我们不会从一个 RDD 为了从同一个分区加载其他分区 RDD),因为大多数操作都是扫描。我们发现了这个 简单的策略在我们所有的用户应用程序中都能很好地工作,所以 远的。想要更多控制的程序员也可以设置一个 每个 RDD 的保留优先级作为缓存的参数。
【讨论】: