一、SparkContext类与SparkConf类
1. 两个类的作用和关系
- 任何Spark程序的编写都是从SparkContext(或用Java编写时的JavaSparkContext)开始的;
- SparkContext的初始化需要一个SparkConf对象,后者包含了Spark集群配置的各种参数(比如主节点的URL)。
2. 两个类的具体使用
初始化后,可用SparkContext对象所包含的各种方法来创建和操作分布式数据集和共享变量。
* 初始化的方法:
Spark shell(在Scala和Python下可以,但不支持Java)能自动完成初始化。
* 初始化的实例:(两种方法)
创建一个4线程的SparkContext对象,并将其相应的任务命名为Test Spark APP
- 方法一:使用Scala代码实现初始化
val conf = new SparkConf()
.setAppName("Test Spark App")
.setMaster("local[4]")
val sc = new SparkContext(conf)
- 方法二:通过调用SparkContext的简单构造函数,以默认的参数值来创建相应的对象。其效果和上述的完全相同:
val sc = new SparkContext("local[4]", "Test Spark App")
二、Spark shell
1. Spark shell的优势
Spark支持用Scala或Python REPL(Read-Eval-Print-Loop,即交互式shell)来进行交互式的程序编写。由于输入的代码会被立即计算, shell能在输入代码时给出实时反馈。在Scala shell里,命令执行结果的值与类型在代码执行完后也会显示出来。
2. Spark shell的使用
- 通过Scala来使用Spark shell,只需从Spark的主目录执行./bin/spark-shell。
原理: 它会启动Scala shell并初始化一个SparkContext对象。我们可以通过sc这个Scala值来调用这个对象。
该命令的终端输出应该如下图所示: - 在Python shell中使用Spark, 直接运行./bin/pyspark命令即可。与Scala shell类似, Python下的SparkContext对象可以通过Python变量sc来调用。上述命令的终端输出应该如下图所示:
三、弹性分布式数据集—RDD
- RDD的概念:
RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark的核心概念之一。
一个RDD代表一系列的“记录”(严格来说,某种类型的对象)。这些记录被分配或分区到一个集群的多个节点上(在本地模式下,可以类似地理解为单个进程里的多个线程上)。
- RDD的优点:
Spark中的RDD具备容错性,即当某个节点或任务失败时(因非用户代码错误的原因而引起,如硬件故障、网络不通等),RDD会在余下的节点上自动重建,以便任务能最终完成。
1. 创建RDD
- 方法一: 从现有的集合创建
比如在Scala shell中:
val collection = List("a", "b", "c", "d", "e")
val rddFromCollection = sc.parallelize(collection)
- 方法二 :基于Hadoop的输入源创建
比如本地文件系统、 HDFS和Amazon S3。基于Hadoop
的RDD可以使用任何实现了Hadoop InputFormat接口的输入格式,包括文本文件、其他Hadoop标准格式、 HBase、 Cassandra等。以下举例说明如何用一个本地文件系统里的文件创建RDD:
val rddFromTextFile = sc.textFile("LICENSE")
上述代码中的textFile函数(方法)会返回一个RDD对象。该对象的每一条记录都是一个表示文本文件中某一行文字的String(字符串)对象。
2. Spark操作
* Spark操作的分类:
创建RDD后,便有了一个可供操作的分布式记录集,在Spark编程模式下,所有的操作被分为转换(transformation)和执行(action)两种。
- 转换操作:
是对一个数据集里的所有记录执行某种函数,从而使记录发生改变;
- 执行:
通常是运行某些计算或聚合操作,并将结果返回运行SparkContext的那个驱动程序
* Spark操作实例:
- 转换(transformation)操作—map操作
该操作对一个RDD里的每一条记录都执行某个函数,从而将输入映射成为新的输出。
举例:
对一个从本地文本文件创建的RDD进行操作
val intsFromStringsRDD = rddFromTextFile.map(line => line.size)
匿名函数:
语法line => line.size表示以=>操作符左边的部分作为输入,对其执行一个函数,并以=>操作符右边代码的执行结果为输出。在这个例子中,输入为line,输出则是line.size函数的执行结果。在Scala语言中,这种将一个String对象映射为一个Int的函数被表示为String => Int。
该语法使得每次使用如map这种方法时,都不需要另外单独定义一个函数。
当函数简单且只需使用一次时(像本例一样时),这种方式很有用。
它对该RDD中的每一条记录都执行size函数。
之前创建过一个这样的由若干String构成的RDD对象(val rddFromTextFile)。通过map函数,我们将每一个字符串都转换为一个整数,从而返回一个由若干Int构成的RDD对象。
输出结果:
其输出应与如下类似,其中也提示了RDD的类型:
intsFromStringsRDD: org.apache.spark.rdd.RDD[Int] = MappedRDD[5] at map at
<console>:14
注意:=>是Scala下表示匿名函数的语法。匿名函数指那些没有指定函数名的函数(比如Scala或Python中用def关键字定义的函数)。
- 执行(action)操作—count
举例:
返回RDD中的记录数目
intsFromStringsRDD.count
输出结果:
14/01/29 23:28:28 INFO SparkContext: Starting job: count at <console>:17 ...
14/01/29 23:28:28 INFO SparkContext: Job finished: count at <console>:17, took
0.019227 s res4: Long = 398
- 计算这个文本文件里每行字符串的平均长度,可以先使用sum函数来对所有记录的长度求和,然后再除以总的记录数目:
val sumOfRecords = intsFromStringsRDD.sum
val numRecords = intsFromStringsRDD.count
val aveLengthOfRecord = sumOfRecords / numRecords
输出结果:
aveLengthOfRecord: Double = 52.06030150753769
注意:
Spark的大多数操作都会返回一个新RDD,但多数的执行操作则是返回计算的结果(比如上面例子中, count返回一个Long, sum返回一个Double)。这就意味着多个操作可以很自然地前后连接,从而让代码更为简洁明了。举例来说,用下面的一行代码可以得到和上面例子相同的结果:
val aveLengthOfRecordChained = rddFromTextFile.map(line => line.size).sum /
rddFromTextFile.count
* 转换(transformation)和执行(action)的特点:
-
转换(transformation):
转换操作是延后的,即在RDD上调用一个转换操作并不会立即触发相应的计算。 -
执行(action):
转换操作会链接起来,并只在有执行操作被调用时才被
高效地计算。
Spark操作的优点:
大部分操作可以在集群上并行执行,只有必要时才计算结果并将其返回给驱动程序,从而提高了Spark的效率。
这就意味着,如果我们的Spark程序从未调用一个执行操作,就不会触发实际的计算,也不会得到任何结果。比如下面的代码就只是返回一个表示一系列转换操作的新RDD:
val transformedRDD = rddFromTextFile.map(line => line.size).filter(size => size > 10).map(size => size * 2)
输出结果:
transformedRDD: org.apache.spark.rdd.RDD[Int] = MappedRDD[8] at map at <console>:14
注意,这里实际上没有触发任何计算,也没有结果被返回。如果现在在新的RDD上调用一个执行操作,比如sum,该计算将会被触发:
val computation = transformedRDD.sum
此时一个Spark任务被启动,并返回如下终端输出:
...
14/11/27 21:48:21 INFO SparkContext: Job finished: sum at <console>:16,
took 0.193513 s
computation: Double = 60468.0
3. RDD缓存策略
-
缓存方法:通过调用RDD的cache函数来实现
Spark最为强大的功能之一便是能够把数据缓存在集群的内存里。 - 缓存命令:
rddFromTextFile.cache
- 缓存原理:
调用一个RDD的cache函数将会告诉Spark将这个RDD缓存在内存中。在RDD首次调用一个执行操作时,这个操作对应的计算会立即执行,数据会从数据源里读出并保存到内存。因此,首次调用cache函数所需要的时间会部分取决于Spark从输入源读取数据所需要的时间。但是,当下一次访问该数据集的时候,数据可以直接从内存中读出从而减少低效的I/O操作,加快计算。多数情况下,这会取得数倍的速度提升。
- 应用举例:
如果现在在已缓存了的RDD上调用count或sum函数,应该可以感觉到RDD的确已经载入到了内存中:
val aveLengthOfRecordChained = rddFromTextFile.map(line => line.size).sum / rddFromTextFile.count
实际上,从下方的输出我们可以看到,数据在第一次调用cache时便已缓存到内存,并占用了大约62 KB的空间,余下270 MB可用:
...
14/01/30 06:59:27 INFO MemoryStore: ensureFreeSpace(63454) called with curMem=32960,maxMem=311387750
14/01/30 06:59:27 INFO MemoryStore: Block rdd_2_0 stored as values to memory (estimated size 62.0 KB, free 296.9 MB)
14/01/30 06:59:27 INFO BlockManagerMasterActor$BlockManagerInfo: Added rdd_2_0 in memory on 10.0.0.3:55089 (size: 62.0 KB, free: 296.9 MB)
...
再次求平均长度:
val aveLengthOfRecordChainedFromCached = rddFromTextFile.map(line => line.size).sum/ rddFromTextFile.count
从如下的输出中应该可以看出缓存的数据是从内存直接读出的:
...
14/01/30 06:59:34 INFO BlockManager: Found block rdd_2_0 locally
...
其他资料参考:
这里会对Spark的使用进行简要介绍并提供示例,但要想了解更多,可参考下面这些资料。
- Spark快速入门: http://spark.apache.org/docs/latest/quick-start.html。
- 针对Scala、 Java和Python的《Spark编程指南》: http://spark.apache.org/docs/latest/programming-guide.html。
- RDD支持的转换和执行操作的完整列表以及更为详细的例子,参见《Spark编程指南》(http://spark.apache.org/docs/latest/programming-guide.html#rdd operations)
以及Spark API(Scala)文档(http://spark.apache.org/docs/latest/api/scala/index.
html#org.apache.spark.rdd.RDD) - Spark支持更为细化的缓存策略。通过persist函数可以指定Spark的数据缓存策略。关于RDD缓存的更多信息可参见: http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence。