一、SparkContext类与SparkConf类

1. 两个类的作用和关系

  • 任何Spark程序的编写都是从SparkContext(或用Java编写时的JavaSparkContext)开始的;
  • SparkContext的初始化需要一个SparkConf对象,后者包含了Spark集群配置的各种参数(比如主节点的URL)。

2. 两个类的具体使用

初始化后,可用SparkContext对象所包含的各种方法来创建和操作分布式数据集和共享变量。

* 初始化的方法:

Spark shell(在Scala和Python下可以,但不支持Java)能自动完成初始化。

* 初始化的实例:(两种方法)

创建一个4线程的SparkContext对象,并将其相应的任务命名为Test Spark APP

  • 方法一:使用Scala代码实现初始化
val conf = new SparkConf()
.setAppName("Test Spark App")
.setMaster("local[4]")  
val sc = new SparkContext(conf)
  • 方法二:通过调用SparkContext的简单构造函数,以默认的参数值来创建相应的对象。其效果和上述的完全相同:
val sc = new SparkContext("local[4]", "Test Spark App")

二、Spark shell

1. Spark shell的优势

Spark支持用Scala或Python REPL(Read-Eval-Print-Loop,即交互式shell)来进行交互式的程序编写。由于输入的代码会被立即计算, shell能在输入代码时给出实时反馈。在Scala shell里,命令执行结果的值与类型在代码执行完后也会显示出来。

2. Spark shell的使用

  • 通过Scala来使用Spark shell,只需从Spark的主目录执行./bin/spark-shell。
    原理: 它会启动Scala shell并初始化一个SparkContext对象。我们可以通过sc这个Scala值来调用这个对象。
    该命令的终端输出应该如下图所示:
    《Spark机器学习》笔记---Spark编程模型
  • 在Python shell中使用Spark, 直接运行./bin/pyspark命令即可。与Scala shell类似, Python下的SparkContext对象可以通过Python变量sc来调用。上述命令的终端输出应该如下图所示:
    《Spark机器学习》笔记---Spark编程模型

三、弹性分布式数据集—RDD

  • RDD的概念:

RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark的核心概念之一。
一个RDD代表一系列的“记录”(严格来说,某种类型的对象)。这些记录被分配或分区到一个集群的多个节点上(在本地模式下,可以类似地理解为单个进程里的多个线程上)。

  • RDD的优点:

Spark中的RDD具备容错性,即当某个节点或任务失败时(因非用户代码错误的原因而引起,如硬件故障、网络不通等),RDD会在余下的节点上自动重建,以便任务能最终完成。

1. 创建RDD

  • 方法一: 从现有的集合创建

比如在Scala shell中:

val collection = List("a", "b", "c", "d", "e")
val rddFromCollection = sc.parallelize(collection)
  • 方法二 :基于Hadoop的输入源创建

比如本地文件系统、 HDFS和Amazon S3。基于Hadoop
的RDD可以使用任何实现了Hadoop InputFormat接口的输入格式,包括文本文件、其他Hadoop标准格式、 HBase、 Cassandra等。以下举例说明如何用一个本地文件系统里的文件创建RDD:

val rddFromTextFile = sc.textFile("LICENSE")

上述代码中的textFile函数(方法)会返回一个RDD对象。该对象的每一条记录都是一个表示文本文件中某一行文字的String(字符串)对象。

2. Spark操作

* Spark操作的分类:

创建RDD后,便有了一个可供操作的分布式记录集,在Spark编程模式下,所有的操作被分为转换(transformation)和执行(action)两种。

  • 转换操作:

是对一个数据集里的所有记录执行某种函数,从而使记录发生改变;

  • 执行:

通常是运行某些计算或聚合操作,并将结果返回运行SparkContext的那个驱动程序

* Spark操作实例:
  • 转换(transformation)操作—map操作

该操作对一个RDD里的每一条记录都执行某个函数,从而将输入映射成为新的输出。
举例:
对一个从本地文本文件创建的RDD进行操作

val intsFromStringsRDD = rddFromTextFile.map(line => line.size)
匿名函数:
语法line => line.size表示以=>操作符左边的部分作为输入,对其执行一个函数,并以=>操作符右边代码的执行结果为输出。在这个例子中,输入为line,输出则是line.size函数的执行结果。在Scala语言中,这种将一个String对象映射为一个Int的函数被表示为String => Int。
该语法使得每次使用如map这种方法时,都不需要另外单独定义一个函数。
当函数简单且只需使用一次时(像本例一样时),这种方式很有用。

它对该RDD中的每一条记录都执行size函数。

之前创建过一个这样的由若干String构成的RDD对象(val rddFromTextFile)。通过map函数,我们将每一个字符串都转换为一个整数,从而返回一个由若干Int构成的RDD对象。

输出结果:
其输出应与如下类似,其中也提示了RDD的类型:

intsFromStringsRDD: org.apache.spark.rdd.RDD[Int] = MappedRDD[5] at map at
<console>:14

注意:=>是Scala下表示匿名函数的语法。匿名函数指那些没有指定函数名的函数(比如Scala或Python中用def关键字定义的函数)。

  • 执行(action)操作—count

举例:
返回RDD中的记录数目

intsFromStringsRDD.count

输出结果:

14/01/29 23:28:28 INFO SparkContext: Starting job: count at <console>:17 ...
14/01/29 23:28:28 INFO SparkContext: Job finished: count at <console>:17, took
0.019227 s res4: Long = 398
  • 计算这个文本文件里每行字符串的平均长度,可以先使用sum函数来对所有记录的长度求和,然后再除以总的记录数目:
val sumOfRecords = intsFromStringsRDD.sum
val numRecords = intsFromStringsRDD.count
val aveLengthOfRecord = sumOfRecords / numRecords

输出结果:

aveLengthOfRecord: Double = 52.06030150753769

注意:
Spark的大多数操作都会返回一个新RDD,但多数的执行操作则是返回计算的结果(比如上面例子中, count返回一个Long, sum返回一个Double)。这就意味着多个操作可以很自然地前后连接,从而让代码更为简洁明了。举例来说,用下面的一行代码可以得到和上面例子相同的结果:

val aveLengthOfRecordChained = rddFromTextFile.map(line => line.size).sum /
rddFromTextFile.count
* 转换(transformation)和执行(action)的特点:
  • 转换(transformation):
    转换操作是延后的,即在RDD上调用一个转换操作并不会立即触发相应的计算。

  • 执行(action):
    转换操作会链接起来,并只在有执行操作被调用时才被
    高效地计算。

Spark操作的优点:
大部分操作可以在集群上并行执行,只有必要时才计算结果并将其返回给驱动程序,从而提高了Spark的效率。
这就意味着,如果我们的Spark程序从未调用一个执行操作,就不会触发实际的计算,也不会得到任何结果。比如下面的代码就只是返回一个表示一系列转换操作的新RDD:

val transformedRDD = rddFromTextFile.map(line => line.size).filter(size => size > 10).map(size => size * 2)

输出结果:

transformedRDD: org.apache.spark.rdd.RDD[Int] = MappedRDD[8] at map at <console>:14

注意,这里实际上没有触发任何计算,也没有结果被返回。如果现在在新的RDD上调用一个执行操作,比如sum,该计算将会被触发:

val computation = transformedRDD.sum

此时一个Spark任务被启动,并返回如下终端输出:

...
14/11/27 21:48:21 INFO SparkContext: Job finished: sum at <console>:16,
took 0.193513 s
computation: Double = 60468.0

3. RDD缓存策略

  • 缓存方法:通过调用RDD的cache函数来实现
    Spark最为强大的功能之一便是能够把数据缓存在集群的内存里。
  • 缓存命令:
rddFromTextFile.cache
  • 缓存原理:

调用一个RDD的cache函数将会告诉Spark将这个RDD缓存在内存中。在RDD首次调用一个执行操作时,这个操作对应的计算会立即执行,数据会从数据源里读出并保存到内存。因此,首次调用cache函数所需要的时间会部分取决于Spark从输入源读取数据所需要的时间。但是,当下一次访问该数据集的时候,数据可以直接从内存中读出从而减少低效的I/O操作,加快计算。多数情况下,这会取得数倍的速度提升。

  • 应用举例:

如果现在在已缓存了的RDD上调用count或sum函数,应该可以感觉到RDD的确已经载入到了内存中:

val aveLengthOfRecordChained = rddFromTextFile.map(line => line.size).sum / rddFromTextFile.count

实际上,从下方的输出我们可以看到,数据在第一次调用cache时便已缓存到内存,并占用了大约62 KB的空间,余下270 MB可用:

...
14/01/30 06:59:27 INFO MemoryStore: ensureFreeSpace(63454) called with curMem=32960,maxMem=311387750
14/01/30 06:59:27 INFO MemoryStore: Block rdd_2_0 stored as values to memory (estimated size 62.0 KB, free 296.9 MB)
14/01/30 06:59:27 INFO BlockManagerMasterActor$BlockManagerInfo: Added rdd_2_0 in memory on 10.0.0.3:55089 (size: 62.0 KB, free: 296.9 MB)
...

再次求平均长度:

val aveLengthOfRecordChainedFromCached = rddFromTextFile.map(line => line.size).sum/ rddFromTextFile.count

从如下的输出中应该可以看出缓存的数据是从内存直接读出的:

...
14/01/30 06:59:34 INFO BlockManager: Found block rdd_2_0 locally
...

其他资料参考:
这里会对Spark的使用进行简要介绍并提供示例,但要想了解更多,可参考下面这些资料。

相关文章:

  • 2021-07-12
  • 2021-08-07
  • 2021-11-28
  • 2021-06-20
  • 2021-05-25
  • 2021-12-10
  • 2022-12-23
  • 2021-09-19
猜你喜欢
  • 2021-04-22
  • 2021-09-15
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-01-12
  • 2021-12-20
相关资源
相似解决方案