【问题标题】:whether spark loads base RDD in Memoryspark是否在内存中加载base RDD
【发布时间】:2016-02-19 00:41:44
【问题描述】:

我是 Spark 的新手。需要帮助了解 spark 的工作原理。 假设README.md 存储在 3 个节点的 HDFS 128 块中,我正在使用 spark shell 来处理它。

val textFile = sc.textFile("README.md")
val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark.first()

在上述情况下,执行将由第 3 行触发。

Spark 会在 HDFS 节点的 RAM 中加载完成 3 次 README.md 拆分,然后过滤 linesWithSpark 并在内存中保留片刻。并从linesWithSpark(从第一次拆分)发送第一行? 或者它只会从 HDFS 节点的磁盘的 Split1 中提取带有“Spark”的第一行并将其发送给驱动程序。

如果我将第 2 行更改为

,处理会发生什么变化
val linesWithSpark = textFile.filter(line => line.contains("Spark")).cache()

【问题讨论】:

  • 不是一个实际的答案,而是一些评论。您不应该关心用户级别的步骤的精确执行顺序,这就是 Spark 的用途。不过,您可以在 spark 的 Web UI 中检查执行 DAG。另一点是sc.textFile("README.md") 将在路径/工作目录而不是 HDFS 上查找“README.md”文件,您必须明确地将 HDFS URI 提供给函数/方法。

标签: scala apache-spark


【解决方案1】:

让我们从一个简单的实验开始。首先让我们加载数据并检查其分布:

val textFile = sc.textFile("README.md", 2)
textFile.glom.map(_.size).collect
// Array[Int] = Array(54, 41)

我们可以怀疑简单的filter 只生成一个任务:

textFile.filter(line => line.contains("Spark")).toDebugString
// String = 
// (2) MapPartitionsRDD[11] at filter at <console>:30 []
//  |  MapPartitionsRDD[8] at textFile at <console>:27 []
//  |  README.md HadoopRDD[7] at textFile at <console>:27 []

现在让我们在没有cache 的情况下运行此作业并收集一些诊断信息:

val cnt = sc.accumulator(0L, "cnt")

val linesWithSpark = textFile.filter(line => {
  cnt += 1L
  line.contains("Spark")
})

linesWithSpark.first()
// String = # Apache Spark
cnt.value
/// Long = 1

正如您所见,没有缓存的作业将只处理一条记录。这是因为first 被作为take(1) 执行。在第一次迭代中,take 仅运行作业on a one partition,并在其迭代器上使用it.take(left),其中left 等于一个。

由于Iterators 是懒惰的,我们的程序在处理完第一行后立即返回。如果第一个分区没有提供所需的结果,take 会在每次迭代中迭代越来越多的已处理分区。

接下来让我们用缓存重复同样的实验:

val cacheCntCache = sc.accumulator(0L, "cacheCnt")

val linesWithSparkCached = textFile.filter(line => {
  cacheCntCache  += 1L
  line.contains("Spark")
}).cache()

linesWithSparkCached.first()
// String = # Apache Spark
cacheCntCache.value
// Long = 54

此外,让我们检查存储信息:

sc.getRDDStorageInfo
// Array[org.apache.spark.storage.RDDInfo] = Array(
//   RDD "MapPartitionsRDD" (12)
//   StorageLevel: StorageLevel(false, true, false, true, 1); 
//   CachedPartitions: 1; TotalPartitions: 2; MemorySize: 1768.0 B; 
//   ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B)

正如您所见,缓存 Spark 将完成对分区的处理并将其缓存在内存中。虽然我无法提供导致此行为的源的确切部分,但它看起来像是一个合理的优化。由于分区已经加载,没有理由停止作业。

另见:Lazy foreach on a Spark RDD

【讨论】:

  • 感谢您的详细解释。
猜你喜欢
  • 2015-08-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-10-13
相关资源
最近更新 更多