大数据概述
大数据这个概念近年来算是如火如荼,那么什么是大数据呢?首先从名字来看,我们可以简单地认为数据量大,而数据量大也就意味着计算量大。这样理解本身是没有任何问题的,只不过这并不能很好的定义大数据。
而业界的一家权威的机构,针对大数据做了描述,认为大数据应该具备如下特征:
1. 数据量(Volume):数据量大,可以达到 TB、PB 甚至更高。而这种规模的数据,传统的数据库已经不好处理了,所以才有了现在各种各样的大数据框架。
2. 多样性(Variety):数据的类型种类繁多,比如简单的数值、文本,地理位置,图片、音频、视频等等,数据存在多样性。而处理多样性的数据所带来的挑战,也就比以前更高;
3. 价值(Value):原始数据都是杂乱无章的,我们无法直接得到信息。而对数据进行清洗、分类、汇总等各种处理之后,我们能够从中找到一些规律,将其变成商业的价值,这一步也就是数据分析师要做的事情。所以分析数据的目的是从中找到一些规律、信息,将其变成价值。而数据的价值密度和数据的总量通常是成反比的,你的数据量增加一倍,但是价值未必增加一倍;
4. 速度(Velocity):对于大数据而言,我们不仅追求大量数据的处理,还希望它能有让人满意的速度。在早期的大数据处理框架中,它是做不到实时的,只能进行离线批处理。但是很多场景下,我们是需要立刻就能计算出结果的,所以大数据领域也就慢慢诞生了更多的实时性框架。
以上便是大数据的 4V 特征:Volume、Variety、Value、Velocity。
而大数据的出现,也必然会带来一些技术性的革新。
比如:数据存储,大数据可以到 PB、EB、ZB 级别,这种数据量单台机器已经存储不下了,所以数据存储方面必然要发生改变。由原来的文件存储变成了分布式文件存储,也就是将一个大文件切分成多个小块存储在多条机器上,这样就解决了数据存储的问题。
不仅如此,由于散落在多个机器上,如果某一台机器挂掉了,那么就导致整个文件都无法读取了。因此为了容错,在切分成多个小块的时候,还会将每一个块多拷贝两份,散落在不同的机器上。这样一台机器挂掉了,还可以从其它机器上读,这一点在后续介绍 HDFS 的时候会详细说。
除了数据存储,还有数据计算。我们说数据量大,那么计算量也会很大。那么怎么办呢?类比数据存储,存储的时候可以存在多台机器上,那么计算的时候是不是也可以让多台机器一块计算呢。所以将一个作业进行拆分,交给不同的机器进行计算,然后再将结果做一个归并,这就是所谓的分布式计算。
而大数据生态圈中最著名的 Hadoop,便是由分布式文件系统 HDFS 和 分布式计算框架 MapReduce 组成的,这个我们后面会说。
除了存储和计算,还有网络问题。以前单机的时候,数据就在你的本地,计算也在本地,所以没什么好说的,直接读取数据、计算就是了。但分布式文件存储和分布式计算就不一样了,因为数据被切分成了多块,有可能某台机器上的计算所需要的数据在其它的机器上,这是很正常的。因此很多时候数据之间的传输是不可避免的,这对网络也是一个挑战,所以至少是万兆网,千兆已经捉襟见肘了。特别是跨数据中心、跨地区,要求会更高。
那么在处理大数据都分为哪几步呢?
1. 数据采集: 一般使用 Flume、Sqoop;2. 数据存储: 一般使用 Hadoop;3. 数据处理、分析、挖掘: 一般使用 Hadoop、Spark、Flink等等;4. 可视化(并不完全属于大数据的范畴), 一般有专门的团队去做;
大数据在技术架构上所带来的挑战
1. 对现有数据库管理技术的挑战:对于 PB、EB 级别的大数据而言,使用目前的关系型数据库存储是不现实的,尽管数据库也可以部署集群,但是规模非常有限。而且由于数据量的原因,也很难使用现有的结构化查询语言来分析现有的大数据;
2. 经典数据库技术并没有考虑数据的多类别:大数据的 4V 特征中有一个 V 是多类别,现在的数据库没有办法很好的存储一些特殊类型的数据;
3. 实时性技术的挑战:想从大量数据中提取相应的价值,花费的时间是不短的,如果使用先有的技术很难做到实时性;
4. 网络架构、数据中心、运维的挑战:数据一直是在高速增长的,还要涉及到数据的传输,这对数据中心、运维也是一个不小的挑战;
如何对大数据进行存储和分析呢?
这是最直观的一个问题,如果你都不能对大数据进行存储、分析,那么也就谈不上所谓的商业价值了。而数据的存储和分析,自然就需要有专业的框架来完成,当然你也可以自己开发一个框架,但这显然是非常困难的,我们也不会这么做。因为在大数据的存储和计算中,存储容量、读写速度、计算效率等等,这些都是需要考虑的。
而幸运的是,Google 的三驾马车:GFS、MapReduce、BigTable 很好的解决了这一点,但是Google并没有将它们开源,只是发表了相应的技术论文。而 Hadoop 便是 Hadoop 的作者基于 Google 的论文、使用 Java 语言开发的。我们来介绍一下:
GFS: 指的就是 Google 公司的分布式文件系统, HDFS 便是基于 GFS 诞生的, 也就是 Hadoop 的分布式文件系统;MapReduce: 分布式计算处理框架, 对应 Hadoop 的 MapReduce, 可以将一个作业拆分成多份在多个机器上并行计算;BigTable: 顾名思义是一张大表, 普通量级的数据可以使用关系型数据库的表进行查询, 但是大数据就没办法了; 而 BigTable 则可以很好的解决这一点, 它对应着大数据框架中的 HBase, 注意这个 HBase 并不隶属于 Hadoop, HBase 是一个独立的分布式数据库, 但是它底层的数据存储依赖于 HDFS, 正如 BigTable 底层的数据存储依赖于 GFS 一样;
初识 Hadoop
下面我们就来认识一下Hadoop,看看它的概念是什么?核心组件有哪些?具有哪些优势?发展史等等。
Hadoop 概述
提到 Hadoop,有狭义上的 Hadoop:指的就是 Hadoop 本身;还有广义上的 Hadoop:指的是围绕着 Hadoop 所构建的生态圈,里面包含了各种各样的框架。下面我们说的 Hadoop,指的是狭义上的 Hadoop,也就是 Hadoop 本身。
先说一下 Hadoop 这个名字的由来,这个名字没有任何的意义,它是作者的儿子给一个玩具大象起的名字,所以 Hadoop 官网的 logo 也是一个大象。另外这些大数据框架基本上都是 Apache 的顶级项目,它们的官网都是 项目名.apache.org,比如 Hadoop 的官网就是 hadoop.apache.org。
那么 Hadoop 是什么呢?
Hadoop 是一个可靠的、可扩展的、分布式的计算框架,它可以对大量的数据集进行并行处理。我们知道单台机器的能力是不够的,所以可以将多条机器组成一个集群,而集群中每一台机器都叫做一个节点,而 Hadoop 可以从单机(单节点)扩展到上千个节点,来对数据进行存储和计算。如果存储不够了,怎么办?直接加机器就好了,节点的扩展非常容易。
另外最重要的是,Hadoop 可以部署在廉价的机器上,而不需要使用昂贵的小型机、或者刀片机等等。而且还提供了故障恢复、容错等机制。
而Hadoop主要由以下几个部分组成
分布式文件系统 HDFS: 将文件分布式存储在很多的节点上;分布式计算框架 MapReduce: 能在很多节点上进行分布式并行计算;分布式资源调度框架 YARN: 实现集群资源管理以及作业的调度;
HDFS 和 MapReduce 我们一开始说过了,但是还有一个 YARN,它是用来对集群资源进行管理、以及作业的调度的。还是举之前的例子,如果一个作业所需要的数据不在当前节点上该怎么办?显然有两种做法:1. 将数据从其它节点上传输过来;2. 或者将作业调度到有数据的节点上。而在大数据领域中是有说法的,移动数据不如移动计算,因为数据的移动成本要比计算的移动成本高很多。所以这个时候就需要 YARN 了,当然 YARN 还用于资源的管理,给每个作业分配相应的资源等等。
我们后面会详细介绍这几个组件。
Hadoop 的优势
1. 高可靠性:hadoop 底层维护多个数据副本,所以即使某个机器出现故障,也不会导致数据的丢失
2. 高扩展性:在集群间分配任务数据,可方便的扩展数以千计的节点。很好理解,如果容量不够了,直接横向扩展,加机器就行。
3. 高效性:在 MapReduce 的思想下,Hadoop 是并行工作的,以加快任务处理速度。实际上如果学了 Spark,会发现 Hadoop 自己所描述的易用性、高效性实在是不敢恭维哈。但是 Hadoop 作为大数据生态圈中非常重要的组件,我们是有必要学好的,而且学了 Hadoop 之后,再学 Spark 会轻松很多。而且学习了 Hadoop,再学 Spark 也会明白为什么 Spark 会比 Hadoop 在效率上高出几十倍、甚至上百倍。
4. 高容错性:能够自动将失败的任务重新分配,如果某台机器挂掉了,那么会自动将任务分配到其他的机器上执行。
5. 可以部署在廉价机器上,降低成本。
6. 成熟的生态圈,里面不仅仅是 Hadoop,里面还有大量的其它框架,后面会说。
Hadoop 生态圈
我们说过 Hadoop 分为狭义 Hadoop 和广义 Hadoop。
狭义 Hadoop 指的是:一个适合大数据分布式存储(HDFS)、分布式计算(MapReduce)和资源调度(YARN)的平台,所以狭义 Hadoop 指的就是 Hadoop 框架本身。
广义 Hadoop 指的是:Hadoop 生态系统,Hadoop 生态系统是一个很庞大的概念,Hadoop 框架本身是其中最重要也是最基础的一个部分;生态系统中的每一个子系统只解决某一个特定的问题域(甚至可能很窄),不搞统一型的一个全能系统,而是小而精的多个小系统。
Hadoop 生态圈里面的东西还是非常非常多的,囊括了大数据处理的方方面面,并且这是一个成熟的生态圈。像 Flume 做日志采集、Sqoop 做数据的导入导出,还有调度系统以及分布式协调服务等等。我们这里学习的是 Hadoop,当然我们说 Hadoop 的时候,指的都是狭义上的 Hadoop。
Hadoop 常用的发行版
首先是社区版,也就是 Apache 版本,直接去官网就可以下载。
纯开源,可以直接在源码的基础上进行二次开发;但是不同版本/框架之间的整合有时会比较麻烦,比如 jar 包的冲突等等。
CDH 版本。
cloudera 公司开发的 Hadoop,并且提供了 cm(cloudera manager),可以通过页面进行可视化操作,比如一键安装各种框架、对框架升级等等。并且还帮你屏蔽了不同框架之间的 jar 包冲突;但是 cm 不开源,并且与社区版本有点出入。但凡是使用 Hadoop 的公司,有百分之 60~70 使用的都是 CDH,包括笔者所在的公司,其信息中心就是采购的 CDH。
HDP版本。
Hortonworkds 公司提供,原装 Hadoop、支持 tez;但是企业级安全不开源。
如果是在学习的时候使用哪种发行版都无所谓,但是在生产环境中最好使用 CDH 或者 HDP,这里我们就使用 CDH 了。
安装 Hadoop
下面我们就开始安装 Hadoop 了,这里你可以使用虚拟机,或者云服务器等等。我这里使用的阿里云上的服务器,操作系统是 CentOS7,说实话个人建议有资金的话可以考虑购买一台云服务器,真的非常方便,不用安装虚拟机占资源。
而 Hadoop 的运行模式有三种,单机模式、伪分布式、完全分布式。
单机模式:基本不用,不用管伪分布式:按照完全分布式来进行搭建、配置,但是机器只有一台完全分布式:真正意义上的多台机器
我们后续使用的是伪分布式,但是在学习的时候,和使用真正意义上的分布式之间是没有太大区别的。
在学习的时候,不建议上来就搭建完全分布式的 Hadoop 集群,Hadoop 还没搞明白就开始搭建分布式集群的话,真的是很容易造成《从入门到放弃》。一开始完全可以搭建一个单机版的伪集群,因为在学习的时候使用单机和使用集群没有太大区别。
下面就开始安装 Hadoop 了,不过在安装它之前,我们还需要安装 jdk。因为 Hadoop 是 Java 语言编写的,所以我们需要安装 jdk,至于版本请保证在1.8以上。
这里我的软件都安装在 /opt 目录下。
jdk 的安装我就不介绍了,很简单,下面安装 CDH 版本的 Hadoop。CDH相关软件包,可以去:http://archive.cloudera.com/cdh5/cdh/5 进行下载,里面除了 Hadoop 之外,还有 Hadoop 生态圈的其它组件。进入页面之后,找到 hadoop-2.6.0-cdh5.15.1.tar.gz ,点击下载即可,或者直接浏览器输入 http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.15.1.tar.gz 这个url,自动就下载了。
注意一下:hadoop-2.6.0-cdh5.15.1.tar.gz 里面 2.6.0 和 社区版本的 Apache Hadoop2.6.0 之间还是有区别的,前者在社区版本的基础上做了一些功能的改善。重点是后面的 5.15.1,这个是 cdh 版本,如果你还需要使用其它组件的话,那么 cdh 版本一定要配套。比如 Hive,Hive 底层的数据存储也依赖于 Hadoop 中的 HDFS,所以使用 Hive 首先也应该安装 Hadoop;而我们这里安装的 Hadoop 的 cdh 版本是5.15.1,那么如果要安装并使用 Hive 的话,其 cdh 版本也要是 5.15.1,比如:hive-1.1.0-cdh5.15.1.tar.gz。所以在使用 cloudera 公司开发的组件时,它们之间的 cdh 版本最好要相同。
下载成功之后,直接解压即可。
解压之后的文件目录入上图所示,我们来介绍一下,一些不重要就不说了。
bin 目录: 和Hadoop客户端相关的脚本文件, 比如: hadoop、hdfs、yarn等等, 比如通过命令 hdfs dfs -ls / 即可查看根目录的文件有哪些;etc 目录: 里面有一个 hadoop 目录, hadoop 目录里面存放了所有的配置文件, 我们后面会修改好几个;include 目录: 这是与 C 语言有关的一些头文件, 我们不用管;lib 目录: 一些本地库, .so 文件; 相当于 windows 的 .dll 文件, 这个不需要关注;libexec 目录: 和 lib 目录类似;sbin 目录: 非常重要的一个目录, 存放了大量的启动文件, 比如启动、关闭集群; 启动、关闭 yarn 等等;
目录里面的文件我们在用到的时候会说,下面我们来介绍 Hadoop 最重要的三大组件:HDFS、MapReduce、YARN。
分布式文件系统 HDFS
什么是HDFS,它和普通的文件系统之间有什么区别呢?
HDFS(Hadoop Distributed File System)是一个分布式文件系统,用于存储文件,通过目录树来定位文件;其次它是分布式的,可以横跨N个机器,由多个节点联合起来实现其功能,集群中的节点有各自的角色。
HDFS 产生背景
随着数据量越来越大,在一台机器上无法存下所有的数据,那么就分配到更多的机器上,但是这样不方便管理和维护。因此迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统,HDFS 只是分布式文件管理系统中的一种。
HDFS使用场景
适合一次写入,多次读出的场景,且不支持文件的修改。适合用来做数据分析,并不适合做网盘应用。很好理解,HDFS 的定位就决定了它不适合像关系型数据库那样,可以任意修改数据。而且写入数据,一定是大批量一次性写,至于原因后面会解释。
HDFS 的设计目标
1. 解决硬件故障
HDFS 设计出来就是为了存储大量数据,因为 HDFS 可以跨上千个节点,每个节点只存储数据的一部分。但是这样的话,其中任何一个节点故障,都会导致无法读取整个文件,所以 HDFS 还要具备相应的容错性。首先 HDFS 在接收到一个大文件之后,会将其切分成块(block),每一个 block 的大小默认是 128M。然后将每一个块会再拷贝两份(所以总共默认是 3 份,也就是三副本),然后分别散落在不同的节点上,这样的话一台节点挂了(或者磁盘坏掉了),还可以从其它节点上找。我们画一张图:
这里我们以 500M 的文件为例,显然它顶多是个小型数据,这里只是举个栗子。
因此以上便属于硬件故障,HDFS 要具备快速检测错误的能力,并且能从错误中自动恢复,这才是 HDFS 设计架构的核心目标,不能说一台机器瘫了或者存储坏掉了,整个 HDFS 集群就不能工作了。
2. 流式数据访问
运行在 HDFS 上应用程序会以流的方式访问数据集,这和普通的文件系统不同,因为 HDFS 更多地被应用于批处理,而不是和用户之间进行交互式访问。所以 HDFS 关注的是高吞吐量,而不是数据访问的低延迟,从这一点我们也能看出 HDFS 不适合做实时处理。
3. 大型数据集
HDFS 可以管理大型数据集,TB、PB 甚至更高的级别。而 HDFS 不怕文件大,就怕文件小,这是面试的时候经常被问到的点,这背后更深层次的含义我们后面会说。一个 HDFS 集群可以有几千个节点,可以管理非常大的文件。
4. 移动计算比移动数据更划算
这一点我们之前说过了,假设计算在 A 节点上,但是数据在 B 节点上。这个时候要么把计算调度到 B 机器上,要么把数据传输到 A 机器上,而移动计算的成本比移动数据的成本低很多,所以我们要将计算调度到 B 机器上。而 HDFS 也支持这一点。
HDFS 的架构
HDFS 有两个核心概念:NameNode、DataNodes,并且是一个 master/slave 架构。而 NameNode 就是 master,DataNodes 是 slave。注意:DataNodes 后面有一个s,意味着会有多个 NameNode;但是 NameNode 后面没有 s,因此它只有一个,事实上也确实如此。
一个 HDFS 集群由一个 NameNode 和多个 DataNode 组成,NameNode 负责管理文件系统的 namespace(名字空间,比如文件的目录结构便是 namespace 的一个部分),并提供给客户端固定的访问途径。因为客户端需要读写数据,而首先经过的就是 NameNode。
除了 NameNode,还有多个 DataNode,DataNode 就是用来存储数据的一个进程,通常一个节点对应一个 DataNode。所以一个 HDFS 集群可以有多个节点组成,其中一个节点负责启动 NameNode进程,剩余的节点负责启动 DataNode 进程。在内部,一个文件会被拆分多个块,并默认以三副本存储,然后默认存储在多个DataNode对应的节点上。
注意:我们说客户端对创建、删除文件,都是通过 NameNode,它负责执行文件系统的类似 CRUD 的操作。并且最重要的是,它还决定了 block 和 DataNode 之间的映射关系。
假设存储一个 150M 的文件,存储的时候会被切分成两个块,那么问题来了:block1 存在哪个 DataNode 中呢,block2 又存在哪个 DataNode 中呢?其实这一点不用担心,因为我们说 NameNode 会记录每个 block 和 DataNode 的映射关系。这些便是数据的元信息,比如:拆分了几个块,每个块都散落在哪些 DataNode 上。所以客户端获取数据的时候,一定要经过 NameNode,不然这些元信息你拿不到。因此在存储的时候,NameNode 会记录这些元信息,当我们获取的时候 NameNode 会根据元信息找到对应的 DataNode,而这个过程对于用户来说是不可见的。
所以你可以简单的理解为:HDFS 就是一个拆文件、合文件的一个过程。存储的时候拆开,获取的时候合并。
至于数据本身的读写,则是通过 DataNode 来完成的,因为数据存在 DataNode 对应的节点上。
所以我们可以再来总结一遍。
NameNode:就是 master,它是一个主管、管理者
管理 hdfs 的名字空间配置副本策略管理数据块(block)映射信息处理客户端读写请求DataNode:就是 slave,NameNode 下达命令,DataNode 执行实际的操作
存储实际的数据块执行数据块的读/写操作client:就是客户端
文件切分,文件上传到 hdfs 的时候,client 将文件切分成一个个的 block,然后上传与 NameNode 交互,获取文件的位置信息与 DataNode 交互,读取或者写入数据客户端提供一些命令来管理 hdfs,比如 NameNode 的格式化客户端可以通过一些命令来访问 hdfs,比如对 hdfs 的增删改查操作还有一个 Secondary NameNode:它不是 NameNode 的替补,当 NameNode 挂掉时,并不能马上替换 NameNode 并提供服务。它的作用如下
辅助 NameNode,分担其工作量,比如定期合并 Fsimage 和 Edits,并推送给 NameNode紧急情况下,可辅助恢复 NameNode(可以恢复一部分)
我们来看几幅漫画,写的非常好,个人给翻译了一遍。
从这里我们便了解了HDFS的整体架构,用一张图总结就是:
那么下面再来总结一下 HDFS 的优缺点。
优点:
-
高容错性数据自动保存多个副本,它通过增加副本的形式,提高容错性某一个副本丢失后,它可以自动恢复
-
适合处理大数据数据规模:能够处理TB、甚至PB级别的数据文件规模:能够处理百万规模以上的文件数量,数量相当之大
可构建在廉价的机器之上,通过多副本机制,提高可靠性
缺点:
不适合低延时数据访问,如果你想做到毫秒级存储,别想了,做不到的无法高效地对大量小文件进行存储,存一个 1G 的数据比存 10 个 100MB 加上一个 24MB 的数据要高效很多
至于为什么?这个问题和我们之前说的 HDFS 不怕文件大,就怕文件小是类似的。
首先 NameNode 一般是唯一的,这就意味着空间是有限的。而 NameNode 要记录文件的元数据,不管你是 1KB,还是 1GB,都需要 150 字节的空间进行记录,如果全是小文件的话,是不是很耗费 NameNode 所在机器的空间呢?
而且小文件存储的寻址时间会超过读取时间,它违反了 HDFS 的设计目标。
不支持并发写入、文件随机修改
一个文件只能有一个写,不允许多个线程同时写。仅支持数据的 append,不支持文件的随机修改。
HDFS 块大小的设置
HDFS 中的文件在物理上是分块存储(block),块的大小可以通过配置参数(df.blocksize)指定,默认大小在 hadoop 2.x 版本中是 128M,老版本中是 64M。
思考:为什么块不能设置太小,也不能设置太大?
1. hdfs 块设置太小,会增加寻址时间,程序一直在找块的开始位置2. hdfs 块设置太大,从磁盘传输的时间会明显大于定位这个块的开始位置所需要的时间。导致程序在处理这块数据时,会非常慢
总结:HDFS 块的大小设置主要取决于磁盘的传输速率。
修改配置文件、启动伪集群
我们需要修改几个配置文件,我们在介绍 Hadoop 目录结构的时候说过,配置文件都在 etc/hadoop 中。
修改 hadoop-env.sh
export JAVA_HOME=/opt/jdk1.8.0_221 # 默认是${JAVA_HOME},需要手动改成java的安装路径
修改 core-site.xml
<!--指定 HDFS 中 NameNode 的地址-->
<!--这里之所以是 localhost,是因为我们只有一台机器,多台机器就要换成相应的主机名或者 IP 地址-->
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
<!--指定 hadoop 运行时产生文件的存储目录,如果不指定,那么重启之后数据就丢失了-->
<!--data 目录会自动创建-->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadoop-2.6.0-cdh5.15.1/data</value>
</property>
所有更改完的配置,都放在 configuration 标签里面。
修改 hdfs-site.xml
<!--指定 hdfs 副本的数量,默认是 3,我们是伪分布式,所以改成 1-->
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
目前只需要修改这三个,然后我们启动集群(伪)
首先是格式化 NameNode(第一次启动时格式化,以后不需要总格式化)。
bin/hdfs namenode -format # 如果配置了环境变量,那么可以直接输入 hdfs
如果格式化成功,那么会创建 /opt/hadoop-2.6.0-cdh5.15.1/data 目录,因为我们指定了 hadoop.tmp.dir。
[root@matsuri hadoop-2.6.0-cdh5.15.1]# ls data
dfs
[root@matsuri hadoop-2.6.0-cdh5.15.1]#
查看之前的data目录,自动帮我们创建了,而且里面也有东西了。并且在格式化的时候会输出很多信息,其中有一条如下:
starting namenode, logging to /opt/hadoop-2.6.0-cdh5.15.1/logs/hadoop-root-namenode-matsuri.out
里面的 /opt/hadoop-2.6.0-cdh5.15.1/logs/hadoop-root-namenode-matsuri.out 是记录启动 NameNode 时候的日志,同理输出中还有 DataNode,但是如果你查看的话会发现并没有什么信息。这是因为显示的文件不对,你只需要把结尾的 out 改成 log 就可以查看日志信息了,这个应该是 Hadoop 内部的问题,不过无关紧要。
启动 NameNode 和 DataNode
# 如果把 sbin 目录也配置了环境变量,那么 sbin/ 也不需要加
sbin/start-dfs.sh
执行该命令的时候会启动 NameNode、DataNode、SecondaryNameNode,因此会让你三次输入当前用户的密码,并且每次启动、关闭的时候都是如此。而且在多节点通信的时候,显然不能这样,因此我们需要配置免密码登陆。
# 执行命令, 一路回车即可
ssh-keygen -t rsa
# 拷贝公钥
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
这样只要登陆过一次,之后再登陆就不需要密码了。当然此时你也可能已经通过手动输入密码的方式启动了,然后我们输入 jps,查看进程:
如果出现了红色方框里的内容,那么证明启动成功了。
然后我们在浏览器输入:ip:50070,可以查看到 webUI 界面,如果访问不到,那么十有八九是你的这个端口没开。可以通过如下命令开启该端口:
# 开启防火墙
systemctl start firewalld
# 开放 50070 端口, --zone 表示作用域、--add-port 表示添加的端口, 格式为: 端口/通讯协议、--permanent 表示永久, 没有此参数重启后失效
firewall-cmd --zone=public --add-port=50070/tcp --permanent
# 重启防火墙
firewall-cmd --reload
页面信息大致如上,在 Summary 中有很多关于节点的信息,可以看一下。另外,导航栏中的最后一个 utilities,点击的话会出现一个下拉菜单,里面有一个 Browse the file system。点击的话,可以查看整个文件的目录结构,后面会说。
如果要关闭集群的话,可以通过 sbin/stop-dfs.sh。
当然启动和停止 Hadoop,除了 start/stop-dfs.sh 之外,还有一个 hadoop-daemons.sh(用于伪分布式)。
start-dfs.sh
# 等价于
hadoop-daemons.sh start namenode
hadoop-daemons.sh start datanode
hadoop-daemons.sh start secondarynamenode
stop-dfs.sh
# 等价于
hadoop-daemons.sh stop namenode
hadoop-daemons.sh stop datanode
hadoop-daemons.sh stop secondarynamenode
为什么不能一直格式化 NameNode,格式化 NameNode 需要注意什么?
格式化 NameNode 会产生新的集群 id,导致 NameNode 和 DataNode 的集群 id 不一致,集群找不到以往的数据。所以格式化 NameNode 的时候,务必要先删除 data 数据和 log 日志,然后再格式化 NameNode。因为两者需要有一个共同的 id,这样才能交互。
hdfs shell 命令(重点)
基本语法:hdfs dfs 命令 参数
这个 HDFS 的 shell 命令和 Linux 是非常类似的,比如查看某个目录下的文件,Linux 中是 ls,那么 hdfs shell 中就是 hdfs dfs -ls;查看文件内容:hdfs dfs -cat filename,可以看到是非常相似的,只不过在 hdfs shell 中需要加上一个横杠。另外 hdfs dfs 还可以写成 hadoop fs,对于 shell 操作来说两者区别不大。
hdfs dfs -help 命令
从 Linux 命令也能看出来,这是一个查看命令使用方法的命令。
hdfs dfs -ls 目录路径
查看某个目录有哪些文件,加上 -R 表示递归显示。当前没有文件,所以不演示了。
hdfs dfs -mkdir 目录
在hdfs上面创建目录,加上 -p 表示递归创建,和 Linux 是一样的。
一开始根目录为空,然后创建一个目录,再次查看发现目录被创建。至于弹出的警告,可以忽略掉。
hdfs dfs -moveFromLocal 本地路径 hdfs路径
将本地文件或目录移动到 hdfs 上面,注意是移动,移完之后本地就没了。
hdfs dfs -cat 文件
查看一个文件的内容。
hdfs dfs -appendToFile 追加的文件 追加到哪个文件
将一个文件的内容追加到另一个文件里面去,比如本地有一个 file.txt,那么 hdfs dfs -appendToFile file.txt /a.txt 表示将本地的 file.txt 文件里面的内容追加到 hdfs 上的 /a.txt 文件里面去。
-chgrp、-chmod、-chown
更改组、更改权限、更改所有者,这个和 Linux 中用法一样。
hdfs dfs -copyFromLocal 本地路径 hdfs路径
将文件从本地拷贝到 hdfs 上面去,这个和刚才的 moveFromLocal 就类似于 Linux 中 cp 和 mv。
hdfs dfs -copyToLocal hdfs路径 本地路径
将 hdfs 上的文件拷贝到本地,这个路径是 hdfs 路径在前、本地路径在后。
hdfs dfs -cp 源hdfs路径 目的hdfs路径
copyFromLocal 是针对本地和 hdfs 来说的,cp 是 hdfs路径 和 hdfs路径 之间的拷贝。
hdfs dfs -mv 源hdfs路径 目的hdfs路径
很简单,无需多说。
hdfs dfs -get hdfs路径 本地路径
等同于 copyToLocal。
hdfs dfs -put 本地路径 hdfs路径
等同于 copyFromLocal。
hdfs dfs -getmerge hdfs路径(通配符) 本地路径
将 hdfs 上面的多个文件合并下载到本地。
hdfs dfs -tail 文件名
显示文件的结尾,类似于 Linux 的 tail。
hdfs dfs -rm 文件
删除文件,如果是文件夹需要加上 -r。
hdfs dfs -rmdir 空目录
删除一个空目录,不常用,一般使用 -rm。
hdfs dfs -du 目录
统计目录的大小信息。
hdfs dfs -du -h /:加上-h人性化显示hdfs dfs -du -h -s / :查看当前目录的总大小
hdfs dfs -setrep 数值 文件
设置文件的副本数量,hdfs dfs -setrep 5 /file.txt:表示将 file.txt 的副本设置成 5。
通过 webUI 来观察 HDFS 文件的存储机制
我们上传一个大文件进去吧,就把jdk压缩包上传到HDFS上。
hdfs dfs -put /opt/jdk-8u221-linux-x64.tar.gz /
这里我们上传到了 HDFS 的根目录,通过 webUI 来查看一下。
我们发现文件已经在里面了,并且这个文件的大小是 180.06M,显然会被切成两个块,每个块的副本系数是 1,因为我们设置的是 1。然后点击一下该文件:
可以看到拆分之后的两个块的信息,分别 block0 和 block1,而且每个块都有一个 ID,是依次增大的。并且两个 Size 加起来一定是 jdk 安装包的大小。然后它存在什么地方呢?还记得之前配置的 hadoop.tmp.dir 吗?
[root@matsuri subdir0]# pwd
/opt/hadoop-2.6.0-cdh5.15.1/data/dfs/data/current/BP-227521778-172.28.112.191-1607186545947/current/finalized/subdir0/subdir0
进入到该目录,可以看到非常的长啊。
我们看到文件名的结尾就是对应的块id,而且大小是不是和 webUI 上显示的一样呢。我们说 jdk 压缩包被切分成了两个块,现在这两个块我们都找到了,如果将它们合并在一起话,那么 tar 命令能不能正常解压呢?我们来试试。
所以你还觉得 HDFS 神奇吗?所以就是之前说的,只是将文件切分成块,然后散落在不同节点的本地存储中。查找的时候,会去 NameNode 获取元信息,找到相应的块再组合起来,就这么简单。所以 HDFS 还是需要依赖本地进行存储的,只不过内部的 NameNode 会帮助你自动对块进行管理。
Python 连接 HDFS 进行相关操作
使用 HDFS SHELL 只是用来临时做测试用,工作中肯定是通过代码来操作的,那么下面我们就来看看如何使用 Python 连接 HDFS,并进行相关操作。
注意:笔者是 Python 方向的,Java、Scala一概不懂。
首先 Python 若想操作 HDFS,需要下载一个第三方库,也叫 hdfs,直接 pip install hdfs 即可。
from pprint import pprint
import hdfs
# 导入相关模块,输入 http://ip:50070,创建客户端
client = hdfs.Client("http://47.93.39.238:50070")
client.list:查看当前目录的内容
print(client.list("/")) # ['jdk-8u221-linux-x64.tar.gz', 'matsuri.txt', 'vtuber']
# status 表示是否显示文件的相关属性, 默认为 False
# 指定为 True 的话,会同时返回文件相关属性,返回的数据格式为:[("", {}), ("", {}), ("", {}), ...]
pprint(client.list("/", status=True))
"""
[('jdk-8u221-linux-x64.tar.gz',
{'accessTime': 1607194592673,
'blockSize': 134217728,
'childrenNum': 0,
'fileId': 16390,
'group': 'supergroup',
'length': 195094741,
'modificationTime': 1607194594501,
'owner': 'root',
'pathSuffix': 'jdk-8u221-linux-x64.tar.gz',
'permission': '644',
'replication': 1,
'storagePolicy': 0,
'type': 'FILE'}),
('matsuri.txt',
{'accessTime': 1607193623050,
'blockSize': 134217728,
'childrenNum': 0,
'fileId': 16388,
'group': 'supergroup',
'length': 30,
'modificationTime': 1607193623604,
'owner': 'root',
'pathSuffix': 'matsuri.txt',
'permission': '644',
'replication': 1,
'storagePolicy': 0,
'type': 'FILE'}),
('vtuber',
{'accessTime': 0,
'blockSize': 0,
'childrenNum': 2,
'fileId': 16386,
'group': 'supergroup',
'length': 0,
'modificationTime': 1607193884650,
'owner': 'root',
'pathSuffix': 'vtuber',
'permission': '755',
'replication': 0,
'storagePolicy': 0,
'type': 'DIRECTORY'})]
"""
client.status:获取指定路径的状态信息
pprint(client.status("/"))
"""
{'accessTime': 0,
'blockSize': 0,
'childrenNum': 3,
'fileId': 16385,
'group': 'supergroup',
'length': 0,
'modificationTime': 1607194594505,
'owner': 'root',
'pathSuffix': '',
'permission': '755',
'replication': 0,
'storagePolicy': 0,
'type': 'DIRECTORY'}
"""
# 里面还有一个 strict=True,表示严格模式
# 如果改为 False,那么如果输入的路径不存在就返回 None
# 为 True 的话,路径不存在,报错
client.makedirs:创建目录
print(client.list("/")) # ['jdk-8u221-linux-x64.tar.gz', 'matsuri.txt', 'vtuber']
# 会自动递归创建,如果想创建的时候给目录赋予权限,可以使用 permission 参数,默认为 None
client.makedirs("/a/b/c", permission=777)
print(client.list("/")) # ['a', 'jdk-8u221-linux-x64.tar.gz', 'matsuri.txt', 'vtuber']
print(client.list("/a")) # ['b']
print(client.list("/a/b")) # ['c']
注意:默认情况下我们只能查看 HDFS 上的数据,但是不能写入、修改、删除。比如:创建目录,可能会抛出如下异常:
hdfs.util.HdfsError: Permission denied: user=dr.who, access=WRITE, inode="/":root:supergroup
那么这个时候需要在 hdfs-site.xml 中加入如下内容,然后重启 Hadoop 集群。
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
client.write:往文件里面写内容
client.read:往文件里面读内容
with client.write("/这是一个不存在的文件.txt") as writer:
# 需要传入字节
writer.write(bytes("this file not exists", encoding="utf-8"))
with client.read("/这是一个不存在的文件.txt") as reader:
# 读取出来也是字节类型
print(reader.read()) # b'this file not exists'
如果出现了requests.exceptions.ConnectionError:xxxxx,那么解决办法就是在你当前使用 Python 的 Windows 机器上的 hosts 文件中增加如下内容:
部署 hadoop的服务器ip 部署hadoop的服务器主机名
write 方法,如果不指定额外的参数,那么需要文件不能存在,否则会报错,提示文件已经存在。如果要对已存在的文件进行操作,那么需要显式的指定参数:overwrite(重写) 或者 append(追加)。
with client.write("/白色相簿", append=True) as writer:
writer.write(bytes("令人讨厌的冬天又来了,", encoding="utf-8"))
with client.read("/白色相簿") as reader:
print(str(reader.read(), encoding="utf-8")) # 令人讨厌的冬天又来了,
#############
with client.write("/白色相簿", append=True) as writer:
writer.write(bytes("冬天的街道,恋人们的微笑,让人想一把火全烧了", encoding="utf-8"))
with client.read("/白色相簿") as reader:
# 由于是追加,之前的内容也读取出来了
print(str(reader.read(), encoding="utf-8")) # 令人讨厌的冬天又来了,冬天的街道,恋人们的微笑,让人想一把火全烧了
#############
with client.write("/白色相簿", overwrite=True) as writer:
writer.write(bytes("暖かい日差しが降り注いできて、眩しすぎ、目が見えない", encoding="utf-8"))
with client.read("/白色相簿") as reader:
# 如果是overwrite,那么之前的内容就全没了
print(str(reader.read(), encoding="utf-8")) # 暖かい日差しが降り注いできて、眩しすぎ、目が見えない
注意:overwrite 和 append 不能同时出现,否则报错。
# 如果 write 里面传入了 encoding 参数,那么 writer.write 则需要写入str,因为会自动按照传入的 encoding 进行编码
with client.write("/白色相簿", overwrite=True, encoding="utf-8") as writer:
girls = ["古明地觉", "古明地恋", "八重樱"]
writer.write(str(girls))
# 同理如果传入了 encoding 参数,reader.read 会读出str,因为会自动按照传入的 encoding 进行解码
with client.read("/白色相簿", encoding="utf-8") as reader:
print(reader.read()) # ['古明地觉', '古明地恋', '八重樱']
client.content:查看目录的汇总情况
比如:当前目录下有多少个子目录、多少文件等等。
print(client.content("/", strict=True))
# {'directoryCount': 5, 'fileCount': 5, 'length': 195094877, 'quota': 9223372036854775807, 'spaceConsumed': 195094877, 'spaceQuota': -1}
client.set_owner:设置所有者
client.set_permission:设置权限
client.set_replication:设置副本系数
client.set_times:设置时间
"""
def set_owner(self, hdfs_path, owner=None, group=None):
def set_permission(self, hdfs_path, permission):
def set_replication(self, hdfs_path, replication):
def set_times(self, hdfs_path, access_time=None, modification_time=None):
"""
client.resolve: 将带有符号的路径,转换成绝对、规范化路径
# 当然并不要求路径真实存在
print(client.resolve("/白色相簿/白色相簿/..")) # /白色相簿
client.walk:递归遍历目录
# 递归遍历文件,类似于 os.walk,会返回一个生成器,可以进行迭代
# 每一步迭代的内容是一个三元组,("路径", ["目录1", "目录2"], ["文件1", "文件2", "文件3"])
for file in client.walk("/"):
print(file)
"""
('/', ['a', 'vtuber'], ['白色相簿', 'jdk-8u221-linux-x64.tar.gz', 'matsuri.txt'])
('/a', ['b'], [])
('/a/b', ['c'], [])
('/a/b/c', [], [])
('/vtuber', [], ['matsuri.txt', 'matsuri1.txt'])
"""
client.upload:上传文件
print("1.c" in client.list("/")) # False
client.upload(hdfs_path="/", local_path="1.c")
print("1.c" in client.list("/")) # True
client.download:下载文件
client.download(hdfs_path="/白色相簿", local_path="白色相簿")
print(open("白色相簿", "r", encoding="utf-8").read()) # ['古明地觉', '古明地恋', '八重樱']
client.checksum:获取文件的校验和
# 获取文件的校验和
print(client.checksum("/白色相簿"))
# {'algorithm': 'MD5-of-0MD5-of-512CRC32C', 'bytes': '00000200000000000000000095b1c9929656ce2b779093c67c95b76000000000', 'length': 28}
client.delete:删除文件或目录
# recursive 表示是否递归删除,默认为 False
try:
client.delete("/vtuber")
except Exception as e:
print(e) # `/test is non empty': Directory is not empty
client.delete("/vtuber", recursive=True)
print(client.list("/"))
# ['白色相簿', '这是一个不存在的文件.txt', '1.c', 'a', 'jdk-8u221-linux-x64.tar.gz', 'matsuri.txt']
当然还有一些其它操作,比如重命名:client.rename 等等,这里就不说了。
HDFS 元数据管理
这里我们再来聊聊 HDFS 的元数据管理,首先元数据包含:文件名、副本系数(或者说副本因子)、块id、以及散落在哪个 DataNode 上。
因此 HDFS 的元数据也就是整个 HDFS 文件系统的层级结构,以及每个文件的 block 信息。
而这些元信息存在我们配置的 hadoop.tmp.dir 中,我们来看一下。
首先 NameNode 启动之后,这些元数据会在内存中,因此为了防止重启之后丢失,肯定要定期写入磁盘。图中的 fs_image 文件便是写入之后的结果,但是写入是定期写入的,假设每隔半小时写入一次。如果 NameNode 宕掉,那么还是会丢失半小时的数据,这也是我们所无法忍受的。因此就像 Redis 一样,在缓存数据的同时还将执行的命令操作缓存起来,比如:hdfs dfs -rm /file.txt 等等,记录在 edits 文件中。edits文件是时刻记录的,因为记录的只是命令而已。
然后根据 edits 中的命令,和 fsimage 综合起来,生成一个新的 fsimage,再把老的fsimage给替换掉,这样就确保了元数据的不丢失,当然还会将数据先加载到内存中。但是这一步并不是交给 NameNode 来做的,因为它还要处理来自客户端的请求,如果合并的工作再交给它,那么 NameNode 的压力就太大了。那么给谁做呢?没错,正是 SecondaryNameNode。所以 NameNode 和 SecondaryNameNode 并不是所谓的主备关系,后者相当于前者的小弟,主要是帮大哥减轻压力的。
以上这个过程,叫做 HDFS 的 CheckPoint。
HDFS 的安全模式
在 HDFS 刚启动的时候,会进入到一种特殊的模式:安全模式,这是 Hadoop 的一种自我保护机制,用于保证集群中数据块的安全性。比如:副本系数是 3,但是某个块的数量是 2,这个时候就会再拷贝一份,满足副本系数。
如果 HDFS 是在安全模式下的话,那么客户端不能进行任何文件修改的操作,比如:上传文件,删除文件,重命名,创建目录等操作。当然正常情况下,安全模式会运行一段时间自动退出的,只需要我们稍等一会就行了,到底等多长时间呢,我们可以通过 50070 端口查看安全模式退出的剩余时间。
我这里的安全模式是关闭的,如果你处于安全模式的话,页面信息会提示你还需多长时间结束安全模式。
虽然不能进行修改文件的操作,但是可以浏览目录结构、查看文件内容。并且在命令行,我们可以控制安全模式的进入、查看、以及退出等等。
bin/hadoop dfsadmin -safemode get: 查看安全模式状态bin/hadoop dfsadmin -safemode enter: 进入安全模式状态bin/hadoop dfsadmin -safemode leave: 离开安全模式
安全模式是 Hadoop 的一种保护机制,在启动时,最好是等待集群自动退出该模式,然后进行文件操作。
有的小伙伴在和 Hive、Spark 整合的时候,刚一启动 HDFS,就开始启动 Hive、Spark 写数据,结果发现写的时候报错了,所以此时应该先等待集群退出安全模式。
分布式计算框架 MapReduce
下面我们来介绍一下 MapReduce,源自于 Google 在 2004 年 12 月发表的 MapReduce 技术论文,它的思想逻辑很简单,完全可以看成是 Python 中 map 和 reduce 的组合。
map: 并行处理输入的数据;reduce: 对map阶段得到结果进行汇总;
MapReduce 就是将一个作业拆分成多份,在多个机器上并行处理,然后再将处理之后的结果汇总在一起。因此它非常适合海量数据的离线处理,不怕你的数据量大,PB、EB 都无所谓,只要你的节点数量足够即可。并且内部的细节我们不需要关心,只需按照它提供的 API 进行编程,我们便能得到结果。
并且当你的计算资源不足时,你可以通过简单的增加机器来扩展计算能力。并且一个节点挂了,它可以把上面的计算任务转移到另一个节点上运行,不至于这个任务完全失败。而且这个过程不需要人工参与,是由 Hadoop 内部完成的。
但是对比 Spark、Flink 等框架,MapReduce 其实是比较鸡肋的。官方说使用 MapReduce 易开发、易运行,只是相对于我们自己实现而言,而使用 Spark、Flink 进行处理要比使用 MapReduce 清蒸的多。
另外 MapReduce 不擅长实时计算。
MapReduce 无法像传统的关系型数据库那样,可以在毫秒级或者秒级内返回结果。
MapReduce 不擅长流式计算。
流式计算输入的数据是动态的,而 MapReduce 的数据数据必须是静态的,不能动态变化。这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。
不擅长 DAG(有向无环图) 计算。
多个程序之间存在依赖,后一个应用程序的输入依赖于上一个程序的输出。在这种情况,MapReduce 不是不能做,而是使用后,每个 MapReduce 作业的输出结果都会写入到磁盘,然后再从磁盘中读取,进行下一个操作,这样做会造成大量的磁盘 IO,导致性能非常的低下。
由于笔者不是 Java 方向的,所以没办法使用 Java 进行 MapReduce 编程。如果是 Python 的话,操作 MapReduce 实际上是不太方便的,所以 MapReduce 的实际操作就不说了。而且事实上,基本都不直接使用 MapReduce 进行编程了。我们有 Spark,它的出现解决了 MapReduce 的效率低下问题。而且还有 Hive,Hive 也是我们后面要学习的一个重要的大数据组件,很多公司都在用,它是将 MapReduce 进行了一个封装,可以让我们通过写 SQL 的方式来操作 HDFS 上的数据,这样就方便多了。但既然是写 SQL,那么肯定要像传统关系型数据库一样,有表名、字段名、字段信息等等。没错,这些信息在 Hive 中也叫作元信息、或者元数据,它一般存在 MySQL 等关系型数据库中,实际的数据依旧是存储在 HDFS 上,相当于帮你做了一层映射关系。关于 Hive 我们介绍的时候再说,总之它也是一个非常非常重要的大数据组件,毕竟使用 SQL 进行编程肯定要比 MapReduce 简单的多,而 Hive 也可以使用 Python 进行连接、执行操作。
资源调度框架 YARN
下面我们来介绍一下资源调度框架 YARN,但是在介绍它之前我们需要了解一个为什么会有 YARN,一项技术的诞生必然是有其原因的。
首先我们用的都是 Hadoop 2.x,我们来看看在 Hadoop 1.x 的时候 MapReduce的架构。当然不管是 1.x 还是 2.x,MapReduce都是 master/slave 架构。
HDFS 是一个 NameNode(master)带多个 DataNode(slave),在 1.x 的 MapReduce 中也是如此,一个 JobTracker(master)带多个 TaskTracker(slave)。JobTracker 是用来跟踪一个作业的,而我们说一个作业可以被拆分成多个任务,每个任务对应一个 TaskTracker。当然这里的拆分并不是将计算本身拆分,举个栗子:我们有 100G 的文件,分别散落在 10 个节点上。我们要对这 100G 的文件执行两次 Map、一次 Reduce,那么结果就是每个节点分别对 10G 的数据执行两次 Map、一次 Reduce。
TaskTracker 是可以和 JobTracker 进行通信的,TaskTrack 需要告诉 JobTracker 自己是否存活。所以我们的客户端 Client 提交作业是先要提交到 JobTracker 上面,然后 TaskScheduler(任务调度器)将任务调度到 TaskTracker 上运行,比如:MapTask、ReduceTask。TaskTracker 会定期向 JobTracker 会报节点的健康状况、任务的执行状况,以及资源的使用情况。
但是这个架构存在问题的,因为只有一个 JobTracker,它要跟踪所有的作业。如果 JobTracker 挂掉了怎么办?要是挂掉了,那么客户端的所有作业都无法提交到集群上运行了。此外 JobTracker 要负责和 Client 进行通信,还要和 TaskTracker 进行通信,因此它的压力会非常大。在后续集群的扩展时,JobTracker 很容易成为瓶颈。此外最关键的一点,在 Hadoop1.x 的时候,JobTracker 仅仅只能支持 MapReduce 作业,想提交 Spark 作业是不可能的。
并且这样还会导致资源利用率低,比如我们有 Hadoop 集群,Saprk 集群,不同的集群也进行不同的资源分配。有可能 Hadoop 集群处于空闲状态,Spark 集群处于缺资源状态,导致它们没办法充分利用集群的资源。
而解决这一点的办法就是,所有的计算框架都运行在一个集群中,共享一个集群的资源,做到按需分配。而想实现这一点,首先要满足 JobTracker 支持不同种类的作业,所以 YARN 便诞生了。
YARN 概述
YARN(Yet Another Resource Manager,另一种资源管理器),是一个通用的资源管理系统,为上层应用提供统一的资源管理和调度,所以YARN是支持不同种类作业的调度。在介绍 Hadoop 生态圈的时候,我们贴了一张图,在 HDFS 之上的就是 YARN。而在 YARN 之上可以运行各种作业,像 MapReduce 作业、Spark 作业等等都可以,只需要提到到 YARN 上就可以了。
YARN 就类似于 1.x 里面的 JobTracker,但是它内部包含了两个部分:一个是资源管理,一个是作业调度或监控,这是两个单独的进程。而在 1.x 当中,都是通过 JobTracker 来完成的,所以它压力大。
所以基于这种架构,会有一个全局的 Resource Manager(RM),每一个应用程序都有一个 Application Master(AM),比如:MapReduce 作业会有一个 MapReduce 对应的 AM,Spark 作业会有一个 Spark 对应的 AM。而一个应用程序可以是一个独立 MapReduce 作业,也可以是多个作业组成的 DAG(有向无环图,多个任务之间有依赖)。
我们说 Resource Manager 是全局的,但除了 RM 之外每个节点还有各自的 NodeManager(NM)。目前抛出的概念有点多,我们来慢慢介绍。
Resource Manager
1. 处理客户端请求。客户端想访问集群,比如提交一个作业,要经过 Resource Manager,它是整个资源的管理者,管理整个集群的 CPU、内存、磁盘等资源。
2. 监控 Node Manager。
3. 启动或监控 Application Master。
4. 资源的分配和调度。
Node Manager
1. 管理单个节点上的资源,Node Manager 是当前节点资源的管理者,当然它也需要跟 Resource Manager 汇报。
2. 处理来自 Resource Manager 的命令。
3. 处理来自 Application Master 的命令。
Application Master
1. 某个任务的管理者。当任务在 Node Manager 上运行的时候,就是由 Application Master 负责管理,因为每个任务都会对应一个 AM。
2. 负责数据的切分。
3. 为应用程序申请资源并分配给内部的任务。
4. 任务的监控与容错。
Container
Container 是 YARN 中资源的抽象,它封装了节点上的多维度资源,如内存、CPU、磁盘、网络等等。其实 Container 是为 Application Master 服务的,因为任务在运行的时候,需要的内存、cpu 等资源都被虚拟化到 Container 里面了。
所以整体流程如下:
1. 客户端向 Resource Manager 提交作业;2. RM 为客户端提交的作业在 Node Manager 上分配一个 Container, 来运行作业对应的 Application Master;3. AM 启动之后要注册到 RM 当中, 因为 RM 是负责全局的资源管理, 而且注册之后客户端可以通过 RM 来查询作业运行的情况; 并且在注册之后, 还要向 RM 申请资源;4. 申请到资源之后, AM 便要求 NM 启动 Container, 在 Container 里面运行 Task;
整个流程并没有那么复杂,并且在运行过程中 AM 是知道每一个 Task 的运行情况的,这便是 MapReduce 在 YARN 上的执行流程。注意:不仅仅是 MapReduce,包括 Spark,除了 AM 之外,整体的执行流程是没有任何区别的。
YARN环境部署
YARN 是 Hadoop2.x 自带的,如果我们想启动 YARN,还是要修改配置文件。下面看看 YARN 的单节点部署:
修改 yarn-env.sh
和 hadoop-env.sh 一样,我们也需要配置 JAVA_HOME。
export JAVA_HOME=/opt/jdk1.8.0_221
修改 yarn-site.xml
<!--reducer获取数据的方式-->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!--指定yarn的ResourceManager的地址-->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>主机名</value>
</property>
修改 mapred-env.sh
老规矩,遇到 env.sh 都是配 JAVA_HOME。
修改 mapred-site.xml
你会发现没有这个文件,不过有一个 mapred-site.xml.template,可以 cp mapred-site.xml.template mapred-site.xml。
<!--指定MR运行在yarn上-->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
修改完毕,下面启动集群。
我们看到多出了两个进程,分别是RM和AM,关于YARN我们还可以通过webUI的方式查看,端口是8088。
里面包含了很多关于节点的信息 以及 任务的信息,注意图中的 Active Nodes,我们看到目前有一个节点存活。因此在运行任务的时候,可以多通过 webUI 的方式关注一下节点和任务的信息。
有时候我们想看一下程序的历史运行情况该怎么办呢?同样是修改 mapred-site.xml。
<!--还是这个配置文件,指定历史服务端地址-->
<property>
<name>mapreduce.jobhistory.address</name>
<value>主机名:10020</value>
</property>
<!--历史web端地址-->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>主机名:19888</value>
</property>
启动历史服务器:sbin/mr-jobhistory-daemon.sh start historyserver,然后通过
http://ip:19888/jobhistory查看
最后是提交作业到 YARN 上运行,但是这里我们就不说了,因为还是那句话,现在基本上很少直接使用MapReduce进行编程了。在后续学习 Spark 的时候,我们会学习如何将作业提交到 YARN 上运行,至于 MapReduce 我们了解一下它的概念即可。而 YARN,它是一个资源管理器,我们是需要掌握它的整体架构的,在未来学习 Spark 的时候依旧需要了解 YARN。
所以在后续学习 Spark 的时候,我们再说如何将作业提交到 YARN 上执行吧。
搭建完全分布式集群
最后再来说一下如何搭建完全分布式集群,由于前面的内容不在乎你使用的是伪分布式还是完全分布式,所以本着简单原则我们采用了伪分布式。但是工作中肯定是需要多机存储的,不然使用 Hadoop 就没有意义了。那么下面我们就来演示一下如何搭建分布式集群,我们之前使用的机器的 ip 是 47.93.39.238,主机名为 matsuri,但实际我的阿里云上有 3 台机器。
47.94.174.89,主机名为 satori,2 核心 8GB 内存47.93.39.238,主机名为 matsuri,2 核心 4GB 内存47.93.235.147,主机名为 aqua,2 核心 4GB 内存
这里我们将 satori 主机(47.94.174.89)作为 master,在 HDFS 中则对应 NameNode、在 Yarn 中对应 ResourceManager;将 matsuri 主机(47.93.39.238)和 aqua 主机(47.93.235.147)作为 worker,在 HDFS 中则对应 DataNode、在 Yarn 中对应 NodeManager。搭建一个三节点的集群。
为了更清晰,我们将之前 matsuri 主机上的环境全部删掉,然后从 0 开始安装。
现在三台主机上既没有 Hadoop,也没有 Java,我们从头开始搭建 Hadoop 完全分布式集群,我们以 satori 节点为例,因为它是 master。
安装 jdk 环境,三个节点都要安装,都安装在 /opt 目录下。
[root@satori ~]# java -version
java version "1.8.0_221"
Java(TM) SE Runtime Environment (build 1.8.0_221-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.221-b11, mixed mode)
然后安装 Hadoop,这里我们故意换一个版本,就用 hadoop-2.6.0-cdh5.8.5 吧,不用和之前一样的,就当做是第一次安装。
[root@satori hadoop-2.6.0-cdh5.8.5]# pwd
/opt/hadoop-2.6.0-cdh5.8.5
三个节点都要安装,而且既然是集群,那么安装的路径、版本都要是一样的。然后配置环境变量,三个节点都要配置:
export JAVA_HOME=/opt/jdk1.8.0_221/
export PATH=$JAVA_HOME/bin:$PATH
# 将 Hadoop 的 bin 目录和 sbin 目录配置到环境变量中
export HADOOP_HOME=/opt/hadoop-2.6.0-cdh5.8.5
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
然后修改配置文件,因为 HDFS 和 Yarn 都是集成在 Hadoop 内部的一个组件,它们各自的启动和关闭也是相互独立的。所以我们先配置 HDFS,然后再配置 Yarn。
修改 hadoop-env.sh:
# 默认是 ${JAVA_HOME},需要手动改成 java 的安装路径
export JAVA_HOME=/opt/jdk1.8.0_221
当前修改的是 master 节点的配置文件,其它节点同理。
修改 core-site.xml:
<!--指定 HDFS 中 NameNode 的地址,也就是 Master 节点的地址,就是我们当前的 satori 节点,IP 为 47.94.174.89-->
<!--但是这里不能指定 47.94.174.89,因为我这里使用的阿里云服务器,所以会有一个公网 IP 和一个内网 IP-->
<!--47.94.174.89 是对外暴露的公网 IP,外界就通过这个 IP 来访问,而内网 IP 可以通过 ifconfig 查看-->
<!--在 master 节点中必须配置内网 IP,否则到时候启动 master 节点的时候会启动不起来-->
<property>
<name>fs.defaultFS</name>
<value>hdfs://172.24.60.6:9000</value>
<!--然后 worker 节点则需要指定 master 节点的公网 IP(47.94.174.89),否则是连接不上 master 的。该配置只有这一点区别,其它的不变-->
</property>
<!--指定hadoop运行时产生文件的存储目录,如果不指定,那么重启之后数据就丢失了-->
<!--data 目录会自动创建-->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadoop-2.6.0-cdh5.8.5/data</value>
</property>
关于这里的 IP,如果你使用的是本机的虚拟机搭建集群的话就不需要担心了,直接输入 ifconfig 查看 master 的 IP,然后 master 配好之后直接 scp 发过去即可,因为所有节点上的软件的安装路径、版本都是一致的。只不过我这里用的是云服务器,所以在指定 master 节点的 IP 时,在 master 节点上要使用内网 IP,worker 节点使用公网 IP。
xml 文件里面都有一个 configuration 标签,我们将这里所做的配置直接拷贝到里面即可。
修改 hdfs-site.xml:
<!-- 副本系数 -->
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<!-- namenode 连接 datanode 时,默认会进行 host 解析查询,这里指定为 false -->
<property>
<name>dfs.namenode.datanode.registration.ip-hostname-check</name>
<value>false</value>
</property>
这里指定 HDFS 文件的副本系数,一般是 3 副本,这里我们的 satori 主机只做 master,matsuri、aqua 主机做 worker 节点,所以副本改成 2。
然后是指定 dfs.namenode.datanode.registration.ip-hostname-check 为 false,这一点非常重要,尤其你使用的是云服务器(并且修改了默认的主机名)。否则你会发现后面明明进程都启动成功了,但总是会出现:There are 0 datanode(s) running and no node(s) are excluded in this operation ...。
其它节点同理。
修改 slaves:
47.93.235.147
47.93.39.238
这里指定 worker 节点,我们将 matsuri 和 aqua 节点作为 worker,里面写上其 IP 地址即可,记得其它节点也要这么配。当然高版本的 Hadoop 里面没有 salves,而是改成了 workers,因为 slaves 在西方比较敏感。
以上就把 HDFS 配置文件修改完了,但是注意:我们要确保三个节点之间是可以互相通信的,以及端口对彼此也是开放的,对于我当前的三台云服务器来说显然是没有问题的。然后还有一个重点的问题,就是免密码登录,否则在 master 上启动 worker 的时候会让你输入 worker 节点的登录密码。
# 生成私钥和公钥,一路回车即可,生成的私钥存放在 id_rsa 文件中、公钥则存放在 id_rsa.pub 文件中
ssh-keygen -t rsa
cd ~/.ssh # 进入到家目录的 .ssh 目录中
touch authorized_keys # 创建 authorized_keys 文件
在每个节点上都执行上面几个步骤,那么所有节点的 .ssh 目录中都有 id_rsa、id_rsa.pub 和 authorized_keys 这三个文件。如果想要实现免登陆的话,假设在 A 节点中远程登陆 B 节点想不输入密码,那么就把 A 节点的 id_rsa.pub 里面的内容添加到 B 节点的 authorized_keys 文件中即可。但是注意,这个过程是单向的,如果在 B 节点中远程登陆 A 节点也不想输入密码的话,那么就把 B 节点的 id_rsa.pub 里面的内容添加到 A 节点的 authorized_keys 中。
[root@satori .ssh]# ssh-copy-id -i id_rsa.pub root@47.93.39.238
/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "id_rsa.pub"
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
root@47.93.39.238's password:
Number of key(s) added: 1
Now try logging into the machine, with: "ssh 'root@47.93.39.238'"
and check to make sure that only the key(s) you wanted were added.
[root@satori .ssh]# ssh-copy-id -i id_rsa.pub root@47.93.235.147
/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "id_rsa.pub"
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
root@47.93.235.147's password:
Number of key(s) added: 1
Now try logging into the machine, with: "ssh 'root@47.93.235.147'"
and check to make sure that only the key(s) you wanted were added.
[root@satori .ssh]#
此时从主机 satori 远程登陆 matsuri、aqua 就无需再输入密码了,同理我们还需要在 matsuri、aqua 上也重复相同的操作,让集群中所有节点之间的通信都畅通无阻。
然后就可以启动 HDFS 了,不过首先要格式化 NameNode,介绍伪分布式集群的时候说过的,格式化在 master 上执行。
客户端要有操作 HDFS 文件的权限。
# 这里我们配置了环境变量,如果没有配的话,那么需要进入 Hadoop 的安装目录,输入 bin/hdfs
hdfs namenode -format
如果格式化成功,那么会自动创建 /opt/hadoop-2.6.0-cdh5.8.5/data 目录:
[root@satori ~]# ls /opt/hadoop-2.6.0-cdh5.8.5/data/
dfs
[root@satori ~]#
然后启动 HDFS,只需要在 master 节点启动即可,会自动启动 worker 节点。启动之后,会发现 worker 节点的 /opt/hadoop-2.6.0-cdh5.8.5/data/ 也自动创建了。
# 如果没有配置环境变量的话,那么需要进入 Hadoop 的安装目录,输入 sbin/start-dfs.sh
# 关闭的话,输入 stop-dfs.sh
start-dfs.sh
这里要保证主机之间端口是开放的,可以互相通信,并且配置免密码登录,否则在启动 worker 时需要输入密码。然后我们输入 jps 查看相关进程:
[root@satori ~]# jps
28080 Jps
27447 SecondaryNameNode
27263 NameNode
# --------------------------
[root@matsuri ~]# jps
15626 Jps
15324 DataNode
# --------------------------
[root@aqua ~]# jps
31233 Jps
30958 DataNode
我们看到相关进程都已经启动,并且 matsuri 和 aqua 是在启动 master 节点时自动启动的。此时 HDFS 文件系统就可以正常工作了。
另外启动时会输出日志,这些日志被写在了文件中,而文件路径图中已经告诉我们了。如果输出 jps 的时候发现相关进程没有启动,可以查看日志文件,里面会记录启动失败的原因以及相关信息。只不过图中给的日志文件路径是错的,我们需要将结尾的 .out 改成 .log,这应该是 Hadoop 内部的小失误,但是高版本的 Hadoop 是否存在这个小失误,个人就不清楚了。
我们可以输入 http://47.94.174.89:50070 通过 webUI 查看 HDFS 文件系统:
显示有两个活跃的 Node 节点,显然我们 HDFS 集群是启动成功了的。但是注意:虽然成功启动了 HDFS,但 Yarn 还没有启动,所以我们接下来还要配置并启动 Yarn。
修改 yarn-env.sh:
和 hadoop-env.sh 一样,只需要指定 java 的安装路径即可。
export JAVA_HOME=/opt/jdk1.8.0_221/
其它节点同理。
修改 yarn-site.xml:
<!-- ResourceManager 对客户端暴露的地址
客户端通过该地址向 RM 提交应用程序,杀死应用程序等 -->
<property>
<name>yarn.resourcemanager.address</name>
<value>172.24.60.6:8032</value>
</property>
<!-- ResourceManager 对 ApplicationMaster 暴露的访问地址。
ApplicationMaster 通过该地址向 RM 申请资源、释放资源等 -->
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>172.24.60.6:8030</value>
</property>
<!-- ResourceManager 对 NodeManager暴露的地址
NodeManager通过该地址向 RM 汇报心跳,领取任务等 -->
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>172.24.60.6:8031</value>
</property>
<!-- ResourceManager 对管理员暴露的访问地址
管理员通过该地址向 RM 发送管理命令等。 -->
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>172.24.60.6:8033</value>
</property>
<!-- ResourceManager对外 webUI 地址,用户可通过该地址在浏览器中查看集群各类信息 -->
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>172.24.60.6:8088</value>
</property>
以上监听的端口是默认的,我们没有做改动,但是这里依旧要写内网 IP,然后 worker 节点在配置的时候写 master 的公网 IP,否则 Yarn 启动不起来。
然后启动即可:
# 没有配置环境变量,则需要去 Hadoop 的安装目录,输入 sbin/start-yarn.sh
# 关闭的话是 stop-yarn.sh
start-yarn.sh
我们看到在 master 节点启动之后,worker 节点也启动了,还是输入 jps 查看一下相关进程。
[root@satori ~]# jps
29649 Jps
29377 ResourceManager
27447 SecondaryNameNode
27263 NameNode
# --------------------------
[root@matsuri ~]# jps
16218 NodeManager
15324 DataNode
16349 Jps
# --------------------------
[root@aqua ~]# jps
31988 Jps
31853 NodeManager
30958 DataNode
因为我们在 slaves 配置了 worker 节点,所以 satori 节点负责启动 ResourceManager,matsuri 节点和 aqua 节点负责启动 NodeManager。如果我们在 slaves 中将 satori 节点的 IP 也加进去了,那么 satori 节点除了会启动 NameNode、ResourceManager 之外,还会启动 DataNode、NodeManager。
然后输入 http://47.94.174.89:8088 即可通过 webUI 查看集群的资源情况:
Yarn 是管理整个集群资源的,所以可以看到整个集群的资源情况,其中内存总量是 16 GB,活跃的节点是两个。
以上就是 Hadoop 完全分布式集群的搭建,并不是很复杂。
小结
如果想走进大数据的大门,那么 Hadoop 是必须要了解的。在了解完 Hadoop 之后,下一个目标就是 Hive,因为使用 MapReduce 编程其实是很不方便的。而 Hive 则是可以让我们像写 SQL 一样来进行 MapReduce 编程,并且很多公司都在使用 Hive 这个大数据组件,我们后面再说。