1.RDD 源码解析
主要方法属性:
- A list of partitions - A function for computing each split - A list of dependencies on other RDDs - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
RDD是一个抽象类,继承类可以有多种实现;
第1个参数SparkContext,@transient表示不需要序列化
第2个参数deps,表示依赖关系
abstract class RDD[T: ClassTag]( private var _sc: SparkContext, private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { //该方法只会被调用一次。由子类实现,返回这个RDD的所有partition。 protected def getPartitions: Array[Partition] //该方法只会被调用一次。计算该RDD和父RDD的依赖关系 protected def getDependencies: Seq[Dependency[_]] = deps // 对分区进行计算,返回一个可遍历的结果 def compute(split: Partition, context: TaskContext): Iterator[T] //可选的,指定优先位置,输入参数是split分片,输出结果是一组优先的节点位置 protected def getPreferredLocations(split: Partition): Seq[String] = Nil //可选的,分区的方法,针对第4点,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce val partitioner: Option[Partitioner] = None }
2.以wordCount程序举例
import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("source-code") val sc = new SparkContext(conf) val textFile = sc.textFile("hdfs://...") val counts = textFile.flatMap(line => line.split(" ")) .filter(_.length >= 2) .map(word => (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile("hdfs://...") } }
2.1 textFile源码
/** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString) }
path: 是任何hadoop支持的文件系统uri
minPartitions:设置的最小分区
2.2 hadoopFile源码
def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }
返回了一个HadoopRDD
2.3 HadoopRDD源码
hadoopRDD中主要的方法:
getPartitions
override def getPartitions: Array[Partition] = { val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(jobConf) } // 创建切片 val inputSplits = inputFormat.getSplits(jobConf, minPartitions) val array = new Array[Partition](inputSplits.size) // 根据切片创建分区Array并返回 for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array }
compute
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new NextIterator[(K, V)] { // 将theSplit转为HadoopPartition val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) val jobConf = getJobConf() val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop) // 找到一个函数,该函数将返回该线程读取的文件系统字节。 // 创建RecordReader,因为RecordReader的构造函数可能读取一些字节 val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { split.inputSplit.value match { case _: FileSplit | _: CombineFileSplit => SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() case _ => None } } inputMetrics.setBytesReadCallback(bytesReadCallback) var reader: RecordReader[K, V] = null val inputFormat = getInputFormat(jobConf) HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime), context.stageId, theSplit.index, context.attemptNumber, jobConf) reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) // 注册一个任务完成回调,以关闭输入流。 context.addTaskCompletionListener{ context => closeIfNeeded() } val key: K = reader.createKey() val value: V = reader.createValue() override def getNext(): (K, V) = { try { finished = !reader.next(key, value) } catch { case eof: EOFException => finished = true } if (!finished) { inputMetrics.incRecordsRead(1) } (key, value) } }
override def getPreferredLocations(split: Partition): Seq[String] = { val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match { case Some(c) => try { val lsplit = c.inputSplitWithLocationInfo.cast(hsplit) val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]] Some(HadoopRDD.convertSplitLocationInfo(infos)) } catch { case e: Exception => logDebug("Failed to use InputSplitWithLocations.", e) None } case None => None } locs.getOrElse(hsplit.getLocations.filter(_ != "localhost")) }