wwzyy

本文是对spark作者早期论文《Spark: Cluster Computing with Working Sets》做的翻译(谷歌翻译),文章比较理论,阅读起来稍微有些吃力,但读完之后总算是对spark有了一个初步的认知,对于后续学习使用spark还是很有帮助的。

摘要

  MapReduce及其各种变种,在商业集群,实现大规模数据密集型应用方面取得了巨大成功。然而,这些系统大多都是围绕非迭代数据模型构建的,不适合其他主流应用。本文侧重于此类应用:可以并行操作重用一组工作数据集的应用。包括许多机器学习迭代算法,以及交互式数据分析等。我们提出了一个名为Spark的新框架,它支持这类应用,同时保留MapReduce 的可扩展性和容错性。为了实现这些目标,Spark 引入了弹性分布式数据集 (RDD) 。RDD 是在一组可分区的只读对象集合,支持分区数据丢失重建。Spark在迭代机器学习中的性能比 Hadoop 高出10倍,并且对39GB数据集的交互式查询,实现亚秒级响应。

1、引言

  集群计算模型已经广泛流行,其中数据并行计算可以在不可靠机器的集群上执行,并提供位置感知调度、容错和负载均衡的能力。MapReduce开创了这种模型,并且Dryad和Map-ReduceMerge等系统也支持此类型的数据计算。这些系统通过编程模型实现其可扩展性和容错性,用户可以创建DAG图的方式,来处理输入的数据。没有用户干预的情况下,底层系统可以自动管理调度并对故障作出反应。尽管这种数据模型对一大类应用程序很有用,但有些应用程序不能有效地表示为非循环数据流。在本文中,我们专注研究这一类应用:可以并行操作重用一组工作数据集的应用。这包括Hadoop 用户报告MapReduce的两个不足案例:

  • 迭代作业:许多常见的机器学习算法,将函数重复应用于同一数据集以优化参数(例如梯度下降)。虽然每次迭代都可以表示为MapReduce/Dryad作业,但每个作业都必须从磁盘重新加载数据,从而导致明显的性能损失。
  • 交互式分析:Hadoop通常用于Pig和Hive等SQL在大型数据集上运行查询。理想情况下,用户能够跨多台机器将感兴趣的数据集加载到内存中并重复查询。但是使用 Hadoop每个查询都会产生很大的延迟(数十秒),因为它作为单独的MapReduce作业运行并从磁盘读取数据。

  本文提出了一种名为Spark的新集群计算框架,支持应用并行处理作业,同时提供了与 MapReduce类似的可扩展性和容错性。Spark 中的主要抽象是弹性分布式数据集 (RDD),它表示在一组机器上分区的只读对象集合,如果分区丢失,可以重建这些对象。用户可以跨机器将 RDD 缓存在内存中,并且可以在多个类似 MapReduce的并行操作中重用它。 RDD沿袭了传统的容错概念:如果RDD的一个分区丢失,该 RDD可以根据从其他RDD派生的信息重建该分区。尽管RDD不是通用的共享内存抽象,但它们体现出了表达性、可扩展性、可靠性之间的最佳平衡点,我们发现它非常适合各种应用程序。

   Spark是在Scala中实现的,Scala是一种基于Java虚拟机的静态高级编程语言,并公开了一个类似于DryadLINQ(微软的大数据框架,类似hive)的函数式编程接口。此外,Spark可以基于Scala的交互式命令行执行,它允许用户定义 RDD、函数、变量和类,并在集群的并行操作中使用它们。我们相信 Spark 是第一个允许以交互方式使用高效的通用编程语言来处理集群上的大型数据集的系统。

  尽管我们对Spark的实现仍然是一个原型,但该系统的早期表现令人鼓舞。我们展示了 Spark 在迭代机器学习工作负载中的性能比Hadoop高10倍,并且可以交互式地用于以亚秒级延迟扫描39 GB数据集。

  本文组织如下。第 2 节描述了 Spark 的编程模型和 RDD。第 3 节展示了一些示例作业。第 4 节描述了我们的实现,包括我们与 Scala 及其解释器的集成。第 5 节介绍了早期结果。我们在第 6 节中调查相关工作,并在第 7 节中进行讨论。

2、编程模型

  为了使用 Spark,需要开发人员编写、实现其应用的高层控制流程以及并行操作的驱动程序。 Spark 为并行编程提供了两个抽象:弹性分布式数据集和并行运算操作(通过传递函数来对数据集调用)。此外,Spark支持两种受限的共享变量,它们可以在集群上运行的函数中使用,我们稍后会解释。 

2.1 弹性分布式数据集(RDD)

  弹性分布式数据集 (RDD) 是在一组机器上分区的只读数据集,分区丢失可以重建。 RDD 的元素不必存于物理存储中;相反,RDD 的句柄包含可靠存储数据足够的信息,这意味着如果节点发生故障,RDD 总是可以重建。在Spark中,每个RDD都由一个Scala对象表示。Spark允许以四种方式构建RDD:

  • 从共享文件系统中创建RDD,例如 Hadoop 分布式文件系统 (HDFS)。
  • 在程序中"并行化"一个 Scala 集合(例如,一个数组),这意味着将其划分为多个切片发送到多个节点。
  • 通过转换现有的RDD,例如具有类型A元素的数据集,可以使用flatMap的操作转换为类型B元素的数据集,该操作将每个元素传递给用户提供的类型A ⇒ List[B]函数。 flatMap还可以进行其他转换操作,包括 map(A ⇒ B 类型的函数传递元素)和 filter(选择与谓词匹配)等算子。
  • 通过更改RDD的持久性,默认情况下,RDD 是惰性和短暂性的。也就是说,数据集的分区在并行操作中会按需实例化(例如通过map函数传递文件块),并在使用后从内存中丢弃。但用户可以通过两个动作来持久化 RDD:

    1) 放到缓存中的数据集进行延长处理,但暗示它应该在第一次计算后,应该保存在内存中,因为它将被重用。

    2) 数据集写入分布式文件系统,例如 HDFS,保存的版本将在以后的操作中使用。

  我们注意到,缓存操作只是一个提示:如果集群中没有足够的内存来缓存数据集的所有分区,Spark 将在使用时再重新计算它们。我们选择了这种设计,是为了节点出现故障或数据集太大时,Spark程序可以继续工作,这个想法类似于虚拟内存。

  我们还计划扩展 Spark其他级别的持久性(例如,跨多个节点的内存复制)。我们的目标是让用户在存储 RDD 的成本、访问它的速度、可能丢失部分数据、重新计算的成本之间进行权衡。

2.2 并行操作

  可以在 RDD 上执行并行操作:

  •  reduce:使用关联函数数据集进行聚和。
  •  collect:将数据集的所有元素发送到驱动程序。例如,并行更新数组的一种简单方法是并行化、映射和收集数组。
  • foreach:通过用户提供的函数遍历每个元素。这个函数的副作用也很明显(可能是将数据复制到另一个系统或更新共享变量,如下所述)。

  Spark 目前不支持 MapReduce 中的分组聚合操作;在一个进程中只能收集一个reduce结果。我们计划在未来使用分布式数据集上的“shuffle”转换来支持分组聚合,如第 7 节所述。然而,即使使用单一的reduce操作就足以满足多种实用的算法了。例如在MapRedcue最近的一篇关于在多核系统上进行机器学习算法的实现中,指出了起码有10种算法不支持并行化,但是在Spark上能够解决其中的绝大部分问题。

2.3 共享变量

  我们可以在Spark中通过闭包(函数)来调用诸如map、filter和reduce之类的操作。作为典型的函数式编程,这些闭包可以引用它们作用域的变量。通常,当 Spark在工作节点上运行闭包时,这些变量会被复制到工作节点。但是,Spark 还允许程序员创建两种受限类型的共享变量,以下是两种简单常见的使用模式:

  • 广播变量:如果有一个很大的只读数据片段(如一个查询表),需要用于多个并行的操作,最好是把它一次分发给每个工作空间,而不是每一个闭包里面都去做一个封装。
  • 累加器:用作累加器的变量,工作空间只能对它进行“add”及其相关操作,并且只有驱动程序可以访问它。他们可以被用来提供在MapReduce上以并行计算的方式实现计数器的累加功能。累加器可被定义为任何类型,有一个“add”的操作以及一个“0”值。由于其“只加”的操作,所以有很好的容错性。

3、示例

  我们现在展示一些Spark程序示例。 注意,我们省略变量类型,因为 Scala 支持类型推断。

3.1 文本搜索

  假设在HDFS上面保存有一批超大数量日志文件,我们需要查找其中的表示错误的行。那么可以通过以下方式,一般从创建一个文件数据集对象的方式开始。

val file = spark.textFile("hdfs://...") 
val errs = file.filter(_.contains("ERROR")) 
val ones = errs.map(_ => 1) 
val count = ones.reduce(_+_)

  我们首先用HDFS中的文件创建一个分布式数据集,作为一个行的集合。接着用这个数据集来创建包含“error”的行的集合(errs),然后把每一行都映射为1(找到一行,就计数为1),最后采用聚合函数将这些数据累加起来。其中filter、map和reduce都是Scala的功能函数名称。需要注意的是,因为RDDS是延时执行的,所errs和ones在初始化的时候没有进行实现。相反的是,当reduce被调用的时候,每个工作节点都去扫描输入模块,以流的方式来读取数据,并将计数累加到驱动程序。通过这种延时处理数据集方式,让Spark对MapReduce进行精确模拟。

  但是Spark不同于其他框架的是,他可以让一些中间数据集实现持续的跨越性操作。例如,如果我们想重新使用errs数据集,只需要采用如下语句来从缓存中创建RDD即可:

val cachedErrs = errs.cache()

  做完这一步之后,我们就可以从缓存中调用errs了,就像调用其他的数据集一样对他进行并行操作。但是errs的内容是在我们第一次计算后的缓存在内存中的结果,所以调用该缓存之后会大大加快后续的操作。

3.2 逻辑回归

  下面的程序实现了逻辑回归,通过迭代分类算法,试图找到一个超平面w把两个点集合分隔开。该算法采用梯度下降法,开始给w赋一个随机值,并在每次迭代中对w的结果进行总结,以移动w的方向对结果进行优化。这里不解释逻辑回归的细节,但是我们可用它来展现一些Spark的特性。因为在Spark上实现此算法受益于能够重复迭代内存中的数据。 

// Read points from a text file and cache them
val points = spark.textFile(...).map(parsePoint).cache()
// Initialize w to random D-dimensional vector
var w = Vector.random(D)
// Run multiple iterations to update w
for (i <- 1 to ITERATIONS) {
	val grad = spark.accumulator(new Vector(D))
	for (p <- points) { // Runs in parallel
		val s = (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
		grad += s * p.x
	}
	w -= grad.value
}

  首先创建一个名为points的RDD节点,我们通过运行一个循环来处理它。For关键字是Scala里面用于表示调用循环的语法,里面的循环体类似于foreach方法。也就是说,对于代码for(p <- points){body}等同于points.foreach(p =>{body})(这种写法是Scala特有的,将所有的操作都当成对象的方法来调用),在此,我们调用了Spark的并行foreach操作。其次,我们定义了一个名为grad的梯度累加器(类型为Vector)。需要注意的是,在循环累加中使用一个重载了的操作符+=,这种在for循环中使用累加的语法,看起来很像是串行操作的程序。实际上,这个例子不同于传统的只有三行代码串行执行的逻辑回归的版本。

3.3 交替最小二乘法

  我们最后一个例子是使用所谓的交替最小二乘法(ALS)的算法。ALS用于处理协同过滤的问题,例如我们要通过用户对电影观看历史和评分来预测他们喜欢的电影(如在Netfix挑战赛中的例子)。不像前面的例子,ALS的算法是CPU密集型的,而不是数据密集型的。

  我们简要的描述一下ALS算法供读者参考。假设我们需要预测用户u对电影m的评分,而我们已经有了很多以往用户对电影的观看数据矩阵R。ALS模型R是两个矩阵M和U的运算结果,M和U的尺寸分别是 M * U 和 K * U。也就是说,每个用户和影片都有一个K维的“特征向量”,描述了它的特点和用户给予它的评价,该特征向量就是用户评级和电影的特点的内积。ALS解决了使用已知的观看评价的M和U,然后计算M*U矩阵的未知值的预测算法。以下使用迭代过程来实现:

  1、使用随机值初始化M。

  2、计算优化U给定M的预测模型R,最大限度的减少错误。

  3、计算优化M给定U的预测模型R,最大限度的减少错误。

  4、重复2、3两步,直到收敛。

  ALS可以在通过在每个节点上并行运行步骤2和步骤3来更新不同用户/电影的信息。然而,所有的步骤都有使用模型矩阵R,所以我们可以将R变成广播变量,这样做是很有效的。这样它就不会在各个节点的所有操作步骤中,都要求被重新发送到每个节点上。通过Spark来实现ALS,如下图所示。请注意,我们通过parallelize方法,获取了0 until U(until是Scala的范围处理方法)以及collect方法(用于把RDD中的所有元素倒入 Scala集合类型),用来更新每个数组。

val Rb = spark.broadcast(R)
for (i <- 1 to ITERATIONS) {
	U = spark.parallelize(0 until u)
			.map(j => updateUser(j, Rb, M))
			.collect()
	M = spark.parallelize(0 until m)
			.map(j => updateUser(j, Rb, U))
			.collect()
}

4、实现

  Spark建立在Mesos之上,这是一个“集群操作系统”,它允许多个并行应用程序以细粒度的方式共享集群,并为应用程序提供API以在集群上启动任务。这使得Spark可以与现有的集群计算框架(如Hadoop和MPI的Mesos端口)一起运行,并与它们共享数据。此外,在Mesos上构建大大减少了使用Spark必须的编程工作。Spark的核心是弹性分布式数据集的实现。例如,假设我们定义一个名为cachedErrs的缓存数据集,表示日志文件中的错误消息,并使用map和reduce统计其元素,如第3.1节所述:

val file = spark.textFile("hdfs://...")
val errs = file.filter(_.contains("ERROR"))
val cachedErrs = errs.cache()
val ones = cachedErrs.map(_ => 1)
val count = ones.reduce(_+_)

  这些数据集将存储为保存有每个RDD血缘关系的对象链,如图1所示。每个数据集对象都包含指向其父级的指针以及有关如何转换得到父级的信息。在内部,每个RDD对象实现相同的简单接口,该接口由三个操作组成:

  • getPartitions,返回分区ID列表。
  • getIterator(partition),它遍历一个分区。
  • getPreferredLocations(partition),用于任务调度以实现数据局部性。

  在数据集上调用并行操作时,Spark会创建一个任务来处理数据集的每个分区,并将这些任务发送到worker节点。我们尝试使用延迟调度的技术将每个任务发送到其首选位置之一。一旦在worker上启动,每个任务都会调用getIterator来开始读取它的分区。

  不同类型的RDD仅在它们实现RDD接口的方式上有所不同。例如,对于Hdfs-TextFile,分区是HDFS中block ID,它们的首选位置是块位置,getIterator打开一个流来读取块。在MappedDataset中,分区和首选位置与父级相同,但迭代器将map函数应用于父级每个元素。最后,在CachedDataset中,getIterator方法查找已转换分区的本地缓存副本,并且每个分区的首选位置开始等于父级的首选位置,但在某个节点缓存分区后更新,以便优先重新使用该节点。此设计使故障易于处理:如果节点出现故障,则会从其父数据集中重新读取其分区,并最终将其缓存在其他节点上。

  最后,向worker发送任务需要向他们发送闭包,用于定义分布式数据集的闭包和传递的诸如reduce之类的操作闭包。为了实现这一点,我们依赖的Scala闭包是Java对象,并且可以使用Java序列化;这是Scala的一个功能,它可以将计算逻辑发送到另一台机器上。但是,Scala的内置闭包实现并不理想,因为我们已经发现了一个闭包对象,引用闭包外部作用域,但实际未在闭包内使用的情况。我们已经提交了一个关于此的错误报告,但与此同时,我们通过对闭包类的字节码执行静态分析来检测这些未使用的变量并将闭包对象中的相应字段设置为null来解决该问题。由于篇幅有限,我们省略了对此分析的细节。

4.1 共享变量

  Spark中两种类型的共享变量,广播变量和累加器,是使用具有自定义序列化格式的类实现的。当使用值v 创建广播变量b 时,v将保存到共享文件系统中的文件中。b的序列化形式是此文件的路径。当在worker节点上查询b的值时,Spark首先检查v 是否在本地缓存中,如果不在,则从文件系统中读取它。我们最初使用HDFS来广播变量,但我们正在开发一种更高效的流媒体广播系统。

  累加器是使用不同的“序列化技巧”实现的。每个累加器在创建时都被赋予唯一的ID。保存累加器时,其序列化形式包含其ID和类型的“零”值。在worker上,为使用线程局部变量运行任务的每个线程创建累加器的单独副本,并在任务开始时重置为零。每个任务运行完成后,worker向Driver程序发送一条消息,其中包含对各累加器所做的更新。Driver程序仅对每个操作的每个分区应用一次更新,以防止在由于故障而重新执行任务时进行重复计数。

4.2 编译器集成

  由于篇幅有限,我们只描述了如何将Spark集成到Scala编译器中。Scala编译器通常通过为用户键入的每一行编译一个类来操作。该类包含一个单例对象,该对象包含该行上的变量或函数,并在其构造函数中运行该行的代码。例如,如果用户输入var x = 5后跟println(x),则编译器定义一个包含x的类(比如Line1),并使第二行编译为println(Line1.getInstance().x)。这些类被加载到JVM中以运行每一行。为了使编译器与Spark一起工作,我们进行了两项更改:

  1、使编译器输出它为共享文件系统定义的类,worker可以使用自定义Java类加载器加载。

  2、我们更改了生成的代码,以便每行的单例对象直接引用前一行的单例对象,而不是通过静态getInstance方法。这允许闭包捕获它们被序列化以发送给worker时所引用的单例的当前状态。如果我们没有这样做,那么对单例对象的更新(例如,上面示例中的行设置x = 7)将不会传播给worker。

5、成果

  尽管我们对Spark的实现仍处于早期阶段,但我们将三个实验的结果联系起来,这些实验表明了它作为集群计算框架的前景。

5.1 逻辑回归

  我们将3.2节中逻辑回归任务的性能与Hadoop的逻辑回归实现进行了比较,在20个“m1.xlarge” EC2节点上使用29 GB数据集测试,每个节点有4个核。结果如图2所示。对于Hadoop,每次迭代需要127s,因为它作为独立的MapReduce作业运行。对于Spark,第一次迭代需要174s(可能是因为使用Scala而不是Java),但后续迭代只需要6s,因为每次迭代都会复用缓存数据。这使任务运行速度提高了10倍。

  我们还尝试在任务运行时使某个节点崩溃。在10次迭代的情况下,这会使任务平均变慢50s(21%)。失败节点上的数据分区在其他节点上重新计算并缓存,但在当前实验中恢复时间相当高,因为我们使用了较大的HDFS块大小(128 MB),因此每个节点只有12块, 恢复过程无法利用集群中的所有核,较小的块大小将产生更快的恢复时间。

5.2 交替最小二乘法

  我们在3.3节中实现了交替最小二乘任务,以测试在迭代任务中将共享数据集复制到多个节点的广播变量的好处。我们发现,在不使用广播变量的情况下,每次迭代重新发送评级矩阵R的时间占据了大部分的任务运行时间。此外,通过简单的广播实现(使用HDFS或NFS),广播时间随着节点数量的增长而线性增长,从而限制了任务的可扩展性。我们实现了应用级多播系统来缓解这种情况。然而,即使使用快速广播,在每次迭代时重新发送R也是昂贵的。使用广播变量在worker的内存中缓存R,在30个节点EC2集群上的5000个电影和15000个用户的实验中将性能提高了2.8倍。

5.3 交互式Spark

  我们使用Spark编译器在15个“m1.xlarge”EC2机器的内存中加载39 GB的*数据,并以交互方式查询。第一次查询数据集时,大约需要35秒,与在其上运行Hadoop作业相当。但是,后续查询只需0.5到1秒,即使它们扫描所有数据也是如此。

6、相关工作

6.1 分布式共享内存

  Spark的弹性分布式数据集可以被视为分布式共享内存(DSM)的抽象,其已经被进行过广泛的研究。RDD与DSM接口的区别有两方面。首先,RDD提供了一个更受限制的编程模型,但是如果集群节点发生故障,则可以有效地重建数据集。虽然一些DSM系统通过检查点实现容错,但Spark使用RDD对象中保存的血缘关系信息重建丢失的RDD分区。这意味着只需要重新计算丢失的分区,并且可以在不同节点上并行重新计算它们,而无需程序恢复到检查点。此外,如果没有节点失败,则不会有日常开销。其次,和MapReduce一样,RDD将计算推送到的数据侧,而不是让任意节点访问全局地址空间。

  其他系统也限制了DSM编程模型提高性能、可靠性和可编程性。Munin让开发者使用他们需要的访问模式来注解变量,以便为它们选择最佳一致性协议。Linda提供了一种可以以容错方式实现的元组空间编程模型。Thor提供了持久共享对象的接口。

6.2 集群计算框架

  Spark的并行操作适合MapReduce模型。但是,它们可以在跨多个操作的RDD上运行。Twister也意识到扩展MapReduce以支持迭代作业的必要,即一个允许长时间的map任务能在作业之间将静态数据保存在内存中的MapReduce框架。但是,Twister目前没有实现容错。Spark对弹性分布式数据集的抽象既支持容错又比迭代MapReduce更通用。Spark程序可以定义多个RDD并在它们运行的操作之间交换操作,而Twister程序只有一个map函数和一个reduce函数。这也使Spark对交互式数据分析很有用,用户可以在其中定义多个数据集然后查询它们。Spark的广播变量提供了类似Hadoop的分布式缓存的功能,它可以将文件传播到运行特定作业的所有节点。但是,广播变量可以在多个并行操作中复用。

6.3 语言集成

  Spark的语言集成类似于DryadLINQ,它使用.NET对语言集成查询的支持来获取定义查询的表达式树并在集群上运行它。与DryadLINQ不同,Spark允许RDD在并行操作间保留在内存中。此外,Spark通过支持共享变量(广播变量和累加器)来丰富语言集成模型,使用具有自定义序列化形式的类实现。我们使用Scala进行语言集成是受到SMR的启发,它是一个用于Hadoop的使用闭包来定义map和reduce任务的Scala接口。我们对SMR的贡献是共享变量和更健壮的闭包序列化实现(如第4节中描述)。最后,IPython是为科学计算提供的Python解释器,它允许用户在集群上使用容错任务队列接口或底层消息传递接口启动计算。Spark提供了类似的交互式界面,但侧重于数据密集型计算。

6.4 血缘

  获取数据集的血缘或起源信息长期以来一直是科学计算数据库领域的研究课题,用于解释结果等应用,允许其他人复制,如果在工作流程步骤中发现错误或数据集丢失了,则重新计算数据。读者可以参考[7]、[23]和[9]对这项工作的研究。Spark提供了一种受限制的并行编程模型,其中细粒度的血缘获取成本低廉,因此该信息可用于重新计算丢失的数据集元素。

7、讨论及未来的工作

  Spark为编程集群提供了三种简单的数据抽象:弹性分布式数据集(RDD)和两种受限类型的共享变量:广播变量和累加器。虽然这些抽象是有限的,但我们发现它们足够强大,可以表达几个对现有集群计算框架构成挑战的应用,包括迭代和交互式计算。此外,我们认为RDD背后的核心思想是数据集句柄,它具有足够的信息来(重新)从可靠存储中可用的数据构建数据集,可能在开发编程集群的其他抽象时非常有用。在未来的工作中,我们计划关注以下四个方面:

  1、正式地表征RDD的属性和Spark的其他抽象,以及它们对各类应用程序和工作负载的适用性。

  2、增强RDD抽象,允许开发者在存储成本和重建成本之间进行权衡。

  3、定义转换RDD的新操作,包括通过给定键重新分区RDD的“shuffle”操作。这样的操作将允许我们实现分组和连接。

  4、在Spark解释器之上提供更高级别的交互式接口,例如SQL和Rshell。

 

参考资料

英文版:http://people.csail.mit.edu/matei/papers/2010/hotcloud_spark.pdf

spark中文文档:http://spark.apachecn.org/#/docs/3

 

相关文章: