• Hadoop基础知识之——MapReduce

    一、背景

    在上一篇文章《Hadoop基础知识之——数据!数据!》一文中,已经交代了Hadoop 诞生的背景和意义,这一篇正式开始接触 Hadoop基本知识,当然从 MapReduce 开始讲起。

    目前 Hadoop官网上最近版本已经到 3.x 了:

    1

    二、环境准备

    工欲善其事,必先利其器。  在学习 Hadoop 的时候,我们必然会涉及到很多 Java 的代码和知识。  所以为了方便,这里先简单列举如何准备 Java 的语言环境和代码编辑器。

    2.1、安装 eclipse 编辑器

    Eclipse是由IBM开发并捐赠给开源社区的一个IDE,也是目前应用最广泛的IDE。Eclipse的特点是它本身是Java开发的,并且基于插件结构,即使是对Java开发的支持也是通过插件JDT实现的。

     

    下载过程这里不再详细赘述,免费的,很简单,参考: https://www.liaoxuefeng.com/wiki/1252599548343744/1255883818398144

    2.2、本地安装 Java

    Java SE JDK 安装也非常简单,这里不再详细赘述,参考: https://www.liaoxuefeng.com/wiki/1252599548343744/1280507291631649

    2.3、安装 Maven

    Maven就是是专门为Java项目打造的管理和构建工具,它的主要功能有:

    • 提供了一套标准化的项目结构;
    • 提供了一套标准化的构建流程(编译,测试,打包,发布……);
    • 提供了一套依赖管理机制。

    我们肯定会在 本地 Eclipse 中创建 Java 项目来测试,所以需要预先准备好 maven 环境。

    Maven 的安装也非常简单,这里不再详细赘述,参考: https://www.liaoxuefeng.com/wiki/1252599548343744/1309301146648610

    2.4、在 Eclipse 中配置使用 Maven 进行项目管理

    首先,确保本地 MAC 环境已经安装好 Java 和 Maven 环境:

    然后,在 Eclipse 中配置 Maven 相关环境:

    打开 Eclipse–>Prefrences–>Maven–>Installations , 点击右侧的Add按钮,在弹出的窗口中,选择 2.3下载解压好的文件夹,如图:

    2

    然后你就把maven装在你的Eclipse中了,但是我们知道,maven是一个方便我们管理jar包的工具,我们需要用到的jar包都是从maven的中央远程仓库里下载的,但是我们不希望每次都去本地仓库里下载,当我们下载过一次之后就可以在我们的本地仓库中导入jar包,那么,怎么连接本地仓库呢?

    新建一个文件夹,作为本地仓库,比如这里我新建了一个 maven_repo的文件夹:

    然后找到2.3 中下载解压后的maven文件夹,里面有conf子文件夹,找到settings.xml, 编辑加入一行配置:

    回到Eclipse,打开Preferences —>Maven—->User Settings:

    第一个框填的是下载 settings.xml 文件路径。

    3

    第二个框填的是刚才的本地仓库路径。

     

    三、MapReduce

    3.1、什么是 MapReduce

    宏观上来看:MapReduce 是一种数据处理的编程模型。 它利用分布式的文件系统,将任务在不同的机器上计算,然后统计聚合计算结果返回。 总体上采取了分而治之与迭代的思想,让计算能力在一定程度上水平无限扩展,从而应对复杂、庞大数据量的计算。

    MapReduce 编程模型将任务过程分为两个处理阶段: map 阶段 和 reduce 阶段。

    每阶段都以键值对作为输入和输出,类型由自己定义。

    我们只需要完成两个函数: map 函数 和 reduce 函数即可,然后 MR 框架会自动帮我们去分布式的执行这两个函数。

    3.2、以例说明

    最典型的例子就是 wordcount 词频统计了,即读取某个文本文件并计算其中单词出现的频率。输入是文本文件,输出是文本文件。 输出的内容是每行包含一个单词和它出现的次数,用制表符分隔。

    当我们安装好Hadoop 环境后,像这种 WordCount 基本测试用例,都是官方自带的:

    当然这个是源码文件,代码比较简洁,但是想要直接运行还需要编译。

    而Hadoop本身将这种 通用型测试 都打包在了 hadoop-examples.jar 包里面了,wordcount 就包含在里面。 我们可以直接运行来看这里面都包含哪些测试程序:

    如上,这里面包含了 wordcount 程序,可以直接运行。

    执行完之后,我们发现 output 如下:

    额,这里竟然生成 1301个 文件,这不是典型的小文件问题吗 ? 这里记一笔,容后再看为啥会这样。

    我们先看结果:

    如上,他确实完成了词频统计,但是这个 java 代码不是很好理解,不接地气。

    趁此机会,刚好要学习 java ,就摸一下从 本地 eclipse 开发环境编写wordcount 项目,并上传至 Kerberos 的 CDH 环境运行。

    3.3. 实践探索,自己实现 wordcount

    首先,我们来创建项目目录结构。 一个使用Maven管理的普通的Java项目,它的目录结构默认如下:

    项目的根目录 a-maven-project 是项目名,它有一个项目描述文件pom.xml,存放Java源码的目录是src/main/java,存放资源文件的目录是src/main/resources,存放测试源码的目录是src/test/java,存放测试资源的目录是src/test/resources,最后,所有编译、打包生成的文件都放在target目录里。这些就是一个Maven项目的标准目录结构。

    所有的目录结构都是约定好的标准结构。使用标准结构不需要做任何配置,Maven就可以正常使用。

    1

    如上,我们创建好目录排版后,创建了三个 代码文件,一个map函数类、一个reduce函数类、一个运行代码类。

    WordCountMapper类:

    如上,我们自定义了 名为 WordCountMapper 的类,继承自 Mapper 类。 这个 Mapper 类是一个抽象类(什么是抽象类?),它有四个形参类型,分别指定 map 函数的输入键、输入值、输出键、输出值的类型:  <LongWritable, Text, Text, LongWritable>

    Hadoop 本身提供了一套可优化网络序列化传输的基本类型,而不直接使用 Java 内嵌的类型。 这些类型都在 org.apache.hadoop.io 包中。 这里使用 LongWritable 类型(相当于java的long长整型)、Text类型(相当于java的String类型)和 IntWritable 类型(相当于 java 的Integer类型)。

    map() 方法的输入是一个键一个值。 我们在代码中先将 一行输入的 Text 转换为 Java 的String 类型,然后用 split 切割放入数组。 map() 函数还提供 Context 实例用于输出内容的写入。我们将 word 和 1 搭配写入 context 。

    WordCountReducer类:

    同样,我们自定义了 WordCountReducer类,继承自 Reduce类,它同样四个形参。reduce 函数的输入类型必须匹配map函数的输出类型。

    看到这里可能会有疑问,代码中只是将 count 进行累加了,并没有判断 key 是否相同,不是应该将 相同key 值的count 进行累加吗 ?

    其实这里我们少了一个 shuffle 环节,即清洗。  shuffle 环节处于 map 之后, reduce 之前。

    WordCount 类:

    如上,Job 对象指定作业执行规范。 我们可以用它来控制整个作业的运行。我们在代码中构造 Job 对象之后,需要指定输入和输出数据的路径。 调用 FileInputFormat 类的静态方法 addInputPath() 来定义输入数据的路径,这个路径可以是单个文件、一个目录(将目录下的所有文件当作输入)。

    调用 FileOutputFormat 类中的静态方法,setOutputPath() 来指定输出路径。 这个方法指定的是 reduce 函数输出文件的写入目录。在运行作业前,该目录是不应该存在的。

    通过 setMapperClass() 和 setReducerClass() 方法指定要用的 map 类方法类型和 reduce 类方法类型。

    setOutputKeyClass() 和 setOutputvalueClass() 方法控制 reduce 函数的输出类型,并且必须和 reduce 类产生的相匹配。  map 函数的输出类型默认和 reduce 是相同的,因此没有额外专门指定。但是如果不同,则需要用 setMapOutputKeyClass() 和  setMapOutputVlaueClass() 来指定。

    最后调用 job 中的 waitForCompletion() 方法来提交作业并等待执行完成。该方法返回 bool 值,表示 true 成功  或者 flase 失败。

    在 Hadoop 集群上运行整个作业时,要把代码打包成一个 Jar 包。

    我们知道 Maven 是通过 Pom 文件来识别并编译管理相关依赖的,我们的程序依赖 hadoop-common 和 hadoop-client 等包,可以在这里指定,我并不需要额外去下载这些包,指定了  https://repository.cloudera.com/artifactory/cloudera-repos 仓库,它会自动帮我去找相关的依赖包。

    pom.xml :

    写好pom文件后,我们就可以开始准备生成 jar 包了:  项目右键选择 debug as,然后进入如下操作:

    Goals 填写: clean compile package

    5

    完成后将自动生成 target 目录:

    7

    我们看到 target 目录下已经生成我们想要的 WordCount-1.0-SNAPSHOT.jar 包了,我们将这个包传到客户端服务器。

    jar 包需要放在本地客户端服务器:

    从上面的结果信息中,我们可以看到一些有价值的东西:

    这个作业的标识 是 job_1602495843334_9212309,执行了 2个 map 和 1300 个 reduce 任务。Counters 部分显示了一些统计信息,我们可以看到:

    10 个 map 输入记录(两个文件一共有10行),产生 46 个 map 输出记录。随后,这 46 个记录,分成 38 组 reduce 输入记录,产生 38 个reduce 输出记录。

    我们看到输出目录下1300 个 part-r-xxxx 文件大部分都是空的(1300 个reduce),大小为0,而不为空的刚好有 38-1 个:


    当运行完成后,可以看到结果如下:

    3.3 概念扩展 与 加深理解

    通过上面的实践,我们已经和可以自己写一个最简单的 Maven 项目,并传到集群,去执行获取结果。  但是我们对集群上任务本身的运行原理还不够了解,是时候停下来扩展一下概念,加深理解。

    3.3.1 MR 的任务数据流

    我们提交一个 MR 作业(Job 或者 Application) 到集群上之后,Hadoop 将作业分成了 两类 任务 Map 和 Reduce。 这些任务运行在集群的节点上,通过 Yarn 去调度。 如果一个任务失败,它将在另一个节点上自动重新调度运行。

    在这个过程中,Hadoop 将 MR 的输入数据划分成 等长的小数据块,称为输入分片(input split)。Hadoop 为每个分片构建一个 map 任务,并由该任务来运行用户自定义的 map 函数从而处理分片中的每条记录。

    那么可想而知,理论上分片越多,map 越多,并发越高就会更快。 但是另一方面,如果分片切的越小,那么管理分片的总时间和构建map的任务的总时间将决定做作业的整个执行时间。

    • 那么分片取多少最合适呢? 对于大多数作业来说,合理的分片大小趋向于 HDFS 的一个块的大小,默认为 128MB。

    我们知道,Hadoop 在存储有输入数据(HDFS中的数据)的本节点上运行 map 任务,可以获得最佳性能,因为它无需使用宝贵的集群带宽资源。 这就是所谓的 “数据本地化优化”。   在实际的集群运行过程中,有时候对一个map任务的 输入分片来说,存储该分片的 HDFS Block 的所有Daatanode 节点可能刚好被正在运行的其他任务占满了。  所以这时候就要从某一个数据块所在的机架中的一个节点上寻找一个空闲的 map 槽来运行该map任务(甚至在某些非常特殊的情况下,会使用其他机架的节点来运行该map任务,这将导致机架之间的网络传输)。

    假如当一个 分片跨越两个数据块,那么对于任何一个 HDFS 节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到map任务运行的节点。 与本 节点运行相比,这种方法显然效率更低。

    • map 任务的输出写入本地磁盘,而不是 HDFS,为什么呢 ? 

    因为map任务的输出是中间结果,该中间结果由 reduce 任务处理后才产生最终输出结果,而且一旦作业完成,map 任务的输出结果就可以删除。因此,如果把它存储在 HDFS中并且实现三副本,难免有些小题大做。 如果运行 map 任务的节点在将 map中间结果传送到 reduce 之前失败,Hadoop 将会在另一个节点上重新运行这个map任务以再次构建map 中间结果。

    • reduce 任务的输出写入 HDFS,又是为什么呢 ? 

    reduce 任务并不具备数据本地化的优势,单个reduce 任务的输入通常来自于多个 map 任务的输出。 因此,排过序的map输出需要通过网络传输发送到运行 reduce 任务的节点。 数据在reduce 端合并,然后由用户定义的 reduce 函数处理。

    对于 reduce 输出的每个 HDFS 块,第一个副本存储在本地节点上,其他副本处于可靠性考虑存储在其他机架的节点中。

    因此,将reduce 的输出写入hdfs确实需要占据一定的集群带宽,但这与正常的 hdfs 管线写入的消耗一样,可以接受。

    • map 任务的数量往往是由输入数据的大小 输入分片决定的,那 reduce 的数量呢 ? 

    reduce 任务的数量并非由输入数据的大小决定,是可以独立指定的。

    如果有好多个 reduce 任务,每个map 任务就会针对输出进行分区(partition),为每个 reduce 任务建一个分区。 每个分区有许多键值,但相同键对应的键值对记录都在同一分区中。

    分区可由用户定义的分区函数控制,但通常用默认的 parttitioner 通过哈希函数来分区,很高效。

    多个map,一个reduce 图示(图片摘自网上):

    1

    多个map,多个reduce 图示(图片摘自网上):

    2

    如上,从这里我们可以更好的理解 shuffle 的作用,顾名思义,洗牌 就是 合并归类的作用。

    当然,当数据处理可以完全并行,不需要 shuffle时,可能会出现 无 reduce 任务的情况。 比如 我们用 distcp 进行数据迁移拷贝时。

     

    3.4 用 Python 实现 WordCount
    3.4.1 Hadoop Streaming

    尽管 Hadoop 框架是用Java编写的,但是跑 Hadoop 的程序不一定用Java,也可以用其他语言开发,比如 Python 或 c++等。 Hadoop 提供了 MapReduce 的 API,允许你使用 非 Java 的其他语言来写自己的map 和 reduce 函数。

    Hadoop Streaming 使用 Unix 标准流作为 Hadoop 和应用程序之间的接口,所以我们可以使用任何编程语言通过标准输入/输出 来写 MapReduce 程序。

    Streaming 天生适合用于文本处理。map 的输入数据通过标准输入流传递给 map 函数,并且是 一行一行的传输,最后将结果行写到标准输出。

    map 输出的键值对是以制表符分割的行,reduce 函数的输入格式与之相同并通过标准输入流进行传输。 reduce 函数从标准输入流中读取输入行,该输入已由 Hadoop 框架根据键值排过序,最后将结果写入标准输出。

    3.4.2 学习 官网的Python 实现 WordCount

    我们先来看一下官网中的 Python跑MR案例

    2

    如上,按照这个文档说明的话,貌似必须使用Jython将Python代码转换成Java jar文件。显然,这不是很方便,如果我们用Jython没有提供的Python特性,甚至可能会产生问题。 Hadoop 自带的Python Wordcount 目录默认在:

    但这个代码感觉不是很友好,就是一点都不 Python:

    这里面采用了大量 org.apahce.hadoop 的包与类,乍一看不好理解,我们用自己的方式去实现一个:

    3.4.3 自己构造 Python WordCount 程序

    根据 3.4.1 我们知道 hadoop streaming 可以很好的处理并被调用,我们的程序目的与标准:

    与传统的 wordcount 一样,我们为单词计数,即读取某个文本文件并计算其中单词出现的频率。输入是文本文件,输出是文本文件。 输出的内容是每行包含一个单词和它出现的次数,用制表符分隔。

    我们先写一个 mapper 脚本,它将从STDIN读取数据,将其拆分为单词,并输出一个行列表,将单词与其计数映射到STDOUT。

    但是,Map脚本不会计算单词出现次数的和。它将立即输出(word ,1)这样的元组。

    然后,我们实现 reducer函数, 它将从STDIN读取mapper.py的结果,并将出现的每个单词和最终的计数相加,然后将其结果输出到STDOUT。

    如上,需要注意的是,这个代码中,要求 reduce 的输入是按照 word 排序过的,即相同的 word 行是挨着进来的,否则会出错。

    我们可以测试一下:

    mapper 将一行输入映射成了 word  1 这样的元组。

    如果我们直接接上 reduce 则会出错,如下,wang 这个单词被分开统计了:

    需要,在中间加一个 shuffle 环节,我们用 sort 来模拟:

    本次测试通过后,我们在 hadoop 集群上去测试运行我们的代码。

    我们先搞一些测试数据来,从这三个地方分别下载三份测试数据:

    将此三个文件移动到 hdfs 集群的目录下:

    然后我们利用 hadoop-streaming 包来执行:

    输出结果如下:

     

     

     

    参考

    Writing An Hadoop MapReduce Program In Python

     

     

     

     

     

  • CDH 资源配置与隔离(一)

    背景

    当数据集群到达一定体量和规模的时候,做好资源的共享和隔离是很有必要的。 这样可以有利于我们把有限的节点尽量合理的利用起来,同时分享给多组用户使用而且将相互影响做到最低,在CDH 里面主要是 yarn多队列做资源隔离。所以想要在此方面有所了解之前,就必须先对Yarn 有一个比较清晰的认知和了解。

    Yarn 的改进与优势

    我们知道 Hadoop2.0 之前任务调度和分配主要是基于Map Reduce 框架的,这里简单叙述一下:MapReduce框架中主要涉及到两个组件:JobTracker 和 TaskTracker(分别对应HDFS中的组件是NameNode和DataNode)。

    JobTracker 进程的作用是运行和监控 MapReduce 的 Job,TaskTracker一个hadoop计算进程,运行在 hadoop 集群的datanode节点上,主要任务是运行 JobTracker 分配给它的实际计算任务,如运行Map、Reduce函数,当然也包括Shuffle过程。当一个客户端向 JobTracker 提交任务时,大致过程如下(图片摘自网上):

    yy

    如果从架构的角度看,map-reduce 架构是比较明晰的,但随着分布式系统集群的规模扩大和增长,一些问题逐渐浮出水面,主要的问题集中如下(本段经验值摘自网上:Hadoop 新 MapReduce 框架 Yarn 详解):

    1. JobTracker 是 Map-reduce 的集中处理点,存在单点故障。
    2. JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成很大的内存开销,潜在来说,也增加了 JobTracker fail 的风险,这也是业界普遍总结出老 Hadoop 的 Map-Reduce 只能支持 4000 节点主机的上限。
    3. 在 TaskTracker 端,以 map/reduce task 的数目作为资源的使用量度过于简单,主要是依赖于槽(slot)的CPU量度,没有结合 cpu 和内存等 资源占用综合情况,如果两个大内存消耗的 task 被调度到了一块,很容易出现 OOM。
    4. 在 TaskTracker 端,把资源强制划分为 map task slot 和 reduce task slot, 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪费,也就是前面提过的集群资源利用的问题。
    5. 源代码层面分析的时候,会发现代码非常的难读,常常因为一个 class 做了太多的事情,代码量达 3000 多行,,造成 class 的任务不清晰,增加 bug 修复和版本维护的难度。
    6. 从操作的角度来看,现在的 Hadoop MapReduce 框架在有任何重要的或者不重要的变化 ( 例如 bug 修复,性能提升和特性化 ) 时,都会强制进行系统级别的升级更新。更糟的是,它不管用户的喜好,强制让分布式集群系统的每一个用户端同时更新。这些更新会让用户为了验证他们之前的应用程序是不是适用新的 Hadoop 版本而浪费大量时间。

    从 0.23.0 版本开始,Hadoop 的 MapReduce 框架完全重构,发生了根本的变化。新的 Hadoop MapReduce 框架命名为 MapReduceV2 或者叫 Yarn,其架构图如下图所示(图片摘自网上):

    yy

    如上,YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等几个组件构成。

    总体上它仍然是Master/Slave结构,在整个资源管理框架中,ResourceManager为Master,NodeManager为Slave,ResourceManager 负责对各个NodeManager上的资源进行统一管理和调度。当用户提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的ApplicationMaster,它负责向ResourceManager申请资源,并要求NodeManger 启动可以占用一定资源的任务。我们来具体看下:

    ResourceManager

    NodeManager

    需要注意的是,在Yarn中我们把 job的概念换成了 application ,因为在新的Hadoop2.x中,运行的应用不只是MapReduce了。

    ApplicationMaster

    Container

    当一个客户端向Yarn提交任务时,大致流程如下:

    1. 用户向 YARN 中提交应用程序,其中包括 ApplicationMaster 程序、启动 ApplicationMaster 的命令、用户程序等。
    2. ResourceManager 为该应用程序分配第一个 Container,并与对应的 Node-Manager 通信,要求它在这个Container中启动应用程序的ApplicationMaster。
    3. ApplicationMaster 首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4~7。
    4. ApplicationMaster采用轮询的方式通过 RPC 协议向 ResourceManager 申请和领取资源。
    5. 一旦 ApplicationMaster 申请到资源后,便与对应的 NodeManager 通信,要求它启动任务 Container 。
    6. NodeManager 为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。
    7. 各个任务通过RPC协议向 ApplicationMaster 汇报自己的状态和进度,以便让 ApplicationMaster 随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。在应用程序运行过程中,用户可随时通过RPC向 ApplicationMaster查询应用程序的当前运行状态。
    8. 应用程序运行完成后,ApplicationMaster 向 ResourceManager 注销并关闭自己。
    Yarn VS MR1

    Yarn 这种模型和架构相比之前MR1的好处是什么 ?

    其实我们从架构的变化就可以看到重构优化根本的思想是将 JobTracker 两个主要的功能分离成单独的组件,这两个功能是资源管理和任务调度 / 监控。新的资源管理器全局管理所有应用程序计算资源的分配,每一个应用的 ApplicationMaster 负责相应的调度和协调。

    1. 这个设计大大减小了 JobTracker(也就是现在的 ResourceManager)的资源消耗,并且让监测每一个 Job 子任务 (tasks) 状态的程序分布式化了,更安全、更优美。
    2. 在新的 Yarn 中,ApplicationMaster 是一个可变更的部分,用户可以对不同的编程模型写自己的 AppMst,让更多类型的编程模型能够跑在 Hadoop 集群中,可以参考 hadoop Yarn 官方配置模板中的 mapred-site.xml 配置。
    3. 对于资源的表示以内存为单位,比之前以剩余 slot 数目更合理。
    4. 老的框架中,JobTracker 一个很大的负担就是监控 job 下的 tasks 的运行状况,现在,这个部分就扔给 ApplicationMaster 做了,而 ResourceManager 中有一个模块叫做 ApplicationsManager 它是监测 ApplicationMaster 的运行状况,如果出问题,会将其在其他机器上重启。
    5. Container 是 Yarn 为了将来作资源隔离而提出的一个框架。这一点应该借鉴了 Mesos 的工作,目前是一个框架,仅仅提供 java 虚拟机内存的隔离 ,hadoop 团队的设计思路应该后续能支持更多的资源调度和控制 , 既然资源表示成内存量,那就没有了之前的 map slot/reduce slot 分开造成集群资源闲置的尴尬情况。

    总体来说,Yarn的优势体现在 :

    可扩展性增加了,可以在更大规模的集群上运行。 《Hadoop权威指南》一书中有提到MapReduce 1在集群节点数达到 4000,任务数达到 40000时,会遇到可扩展性的瓶颈,而Yarn 可以扩展到 10000个节点和 100000 个任务。

    可用性增强了,当服务守护进程失败时,通过为另一个守护进程复制接管工作所需的状态以便继续提供服务,但是jobtracker内存中大量高速变化的复杂状态使得这一操作很困难,相比之下Yarn更方便容易。

    利用率提高了,MapReduce 1中,每个 tasktracker 都配置若干长度的槽slot,这些slot是静态分配的,在配置的时候就被划分为 map slot 和 reduce slot,而一个map slot 只能仅能用于运行一个map任务,同样一个reduce 只能运行一个 reduce 任务。 这样就可能出现集群中只有map slot可用,而导致reduce 任务必须等待的情况。 而Yarn中一个节点管理器管理一个资源池,则不会有这样的问题。

    多租户环境完善,Yarn 可以运行各种各样的分布式服务,而不只是MapReduce程序。它支持内存计算框架,流式计算框架,迭代式计算框架,甚至不同版本的MR。

    类似的资源管理领域产品

    这里提及这个是想开拓一下这类组件的视野,不要提起大数据hadoop集群资源管理,就只知道Yarn。比较知名的有:

    • Google 的开源产品 Kubernetes,容器集群管理项目。它的设计目标是在主机集群之间提供一个能够自动化部署、可拓展、应用容器可运营的平台。Kubernetes 通常结合docker容器工具工作,并且整合多个运行着docker容器的主机集群。
    • Twitter开源了Mesos,也是基于资源分配和任务调度的分离设计的。

    这两个组件就不在这里详细讲述了,后续有机会再深入吧。

    Yarn 中的调度

    理想情况下我们当然希望Yarn应用发出的资源请求能够立刻被给予满足,然而现实中资源是有限的,Yarn 需要采取一系列措施尽可能的满足每一个应用的资源需求。

    Yarn 中主要有三种调度器可用:FIFO调度器、容量调度器(Capacity Scheduler)、公平调度器(Fair Scheduler)。

    FIFO Scheduler

    顾名思义,传统的先进先出队列性质。 将应用放置在一个队列中,然后按照提交的顺序先后运行。 简单易懂,但是不适合在集群中使用。这里不过多描述了。

    Capacity Scheduler

    以资源容量使用作为限制基础,允许集群划分N个队列,并为其配置对应的资源。每个队列可以进一步按层次划分,这样每个队列中的不同用户可以共享该队列中的资源。在一个队列内,使用FIFO调度策略对应用尽心调度。

    yy

    如上图所示,单个作业使用的资源不会超过其队列容量。然而,如果一个队列中有多个作业在运行,而且这个队列的资源不够用了。这时如果仍有可用的空闲资源,那么容量调度器可能会将空余的资源分配给队列,哪怕这会超出队列容量。这被知之为“弹性队列”。  (当然这个值是可以设置的,我们可以通过 yarn.scheduler.capacity.<queue_path>.user-limit-factor参数来设置,大于1就代表可以超资源使用)

    总结来说,容量调度器的特性如下:

    Fair Scheduler

    顾名思义,默认情况下,各个队列之间的资源共享是平均分配的。

    举个例子说的更清楚些:比如 A、B 两个用户,分别拥有自己的队列。 A启动一个作业,在B没有需求时A会分配到全部可用资源; 当A的作业仍然在运行时B启动了一个作业,一段时间后(不是立刻,需要等A腾出一些资源),A 和 B将各用到集群一半的资源。 这时,如果 B 启动第二个作业且其他作业仍然在运行,那么B的两个资源将共享B的所有资源,B的每个作业将占用集群 1/4 的资源。

    yy

    关于三种策略的配置这里就不做详细的介绍了。

    了解了三个队列的调度策略之后,我们需要结合实际来看下如何在 CDH 中配置和管理 Yarn 的资源分配。

    默认情况下,CDH的 Yarn 采用的 Fair Scheduler 调度策略,我们可以在CM的管理页面对其进行配置,很方便:

    yy

    如上,默认情况下,我们在资源池可以看到集群当前有多少个 NodeManager,以及多少内存和CPU资源。这里你可能会对Vcore这个概念产生疑问。

    在Yarn中 cpu 默认被划分为虚拟cpu(也就是我们说的 Vcore),这里的虚拟cpu是yarn自己引入的概念,初衷是考虑到不同节点cpu性能可能不同,每个cpu具有计算能力也是不一样的,比如,某个节点的物理 cpu 计算能力可能是另外一个节点的物理 cpu 的2倍,这时候,你可以通过为第一个节点的物理cpu多配置几个虚拟cpu弥补这种差异。用户提交作业时,可以指定每个任务需要的虚拟cpu个数。

    我们可以在Yarn配置项里搜索一下相关的对应配置:

    yy

    如上,可以看到我们可以为很多Item来指定所谓的虚拟内核数,比如一个map任务是1个Vcore,一个Reduce 也是一个Vcore,最下面的那个参数代表单节点上可以使用的Vcore数量,默认是16个,这个和我的CPU核数有关,我的Datanode节点的CPU配置如下:

    如上,开了超线程,我的CPU线程数是刚好是16个。 此参数默认与我们的线程数保持一致。

    然后我们接着看Yarn资源队列的配置图,默认情况下,CDH中一个根队列为root,以及一个名为users 和 default 的子队列,各站50%的资源权重。它还提供了方便的按钮供我们创建新的资源池。

    我们看到它提供了四个选项,分别是资源池、计划模式、放置规则、用户限制。我们一个一个来探究一下:

    资源池

    当我点击创建资源池时,可以指定如下选项:

    yy

    权重即与其他资源迟所占的资源比例,如果我这里设为1,则表示和其他资源池各占1/3,因为其他资源池默认这个值也是1 ,所以 1 /(1+1+1)。额,好吧,有点啰嗦了。也就是说,如果我把这个值设置为2,则我应该拥有50%的集群资源。我创建了个名为test的资源池,默认参数,如下:

    yy

    我们看到,虽然是Fair Scheduler,但是我们仍然可以为每个资源池指定最小和最大资源,并且这个值是优先于基于权重的份额,也就是说它会覆盖权重比例。 你可能会问这样做的意义是什么,既然使用了公平策略,制定了权重比例,那限制这个还有什么意义呢,就目前看来我认为最大的可能性就是避免某个队列在资源空闲的时候占用了集群过多资源,导致其他队列开始任务时,等待时间过长,所以如果指定最大资源限制,就可以起到一些限制作用。

    我们看到资源池还有其他几个辅助功能,如下:

    yy

    这几个功能设置都很简单,点进去就有很好的图形化和文字说明,配置使用即可,这里不再多做说明。

    计划模式

    计划模式可以根据需要配置在不同时间段选择不同的资源调度规则。说的简单点就是类似于 Crontab 计划任务一样,可以以时间为粒度设置某个队列的权重,系统默认就有一个计划模式为default,我现在再加两个,比如:

    yy

    如上,计划模式 happyday表示周末两天,sorryday表示周内五天,它的评估顺序是自上而下的,当然也可以调整。 这时候,我们就有三个计划模式了,系统会默认的从上到下去匹配,比如今天是星期6,它先去匹配计划模式1,没有匹配到,然后去匹配计划模式2就匹配到了。

    我们可以为每个计划模式指定不同的资源权重比例,我们切回到资源池就可以看到:

    yy

    放置规则

    在讲述放置规则之前,我们需要先了解 Yarn 的几个参数设置:

    下面这个参数如果开启,则表示当一个用户或者应用去运行时,如果它没有指定要用哪个资源队列,则默认的为他创建一个 root.users.name(name为用户名), 这个参数设置如果设置为false后,没有创建资源队列的用户,提交任务时,任务最终提交到 default 队列(如果勾了这个,且没有创建对应用户名的资源池,任务直接提交失败) 资源池:

    yy

    比如,我这边新建了一个work用户,然后去hue执行了一个任务,就可以在Yarn的资源池看到多了一个work子队列:

    yy

    接下来这个参数如果设置为ture,应用程序在执行时会创建权重为1的对应用户名的资源池,这样起不到资源管控的效果。

    yy

    如上,通过这两个参数我们了解到默认情况下队列是可以创建的,也就是说其实默认情况下我们的资源队列是没有什么作用的,所有用户运行程序,最不济都会生成一个自己用户名的队列。而且它可以制定并且直接创建资源队列,这个放置规则讲的就是用户在执行时资源队列的选择:

    yy

    如上,从上到下三条规则匹配了所有可能。

    用户限制

    顾名思义,这个就是限制某个用户可以运行的最大的应用程序数,如下,我点击创建一个用户限制:

    yy

    四个选项讲完之后,还有一个比较重要的概念,就是计划策略,我们可以编辑资源池来选择计划策略类型:

    yy

    写到这里其实我有点迷糊,上面这个计划策略的三种算法感觉和Yarn的几种 Schedule 调度器很类似啊,我们在Yarn的配置里可以选择Yarn的调度器模式:

    jj8

    如上,默认的是Fair公平调度器。  那么这里的Fair公平调度器和编辑资源池里面的FAIR计划策略是否一样呢 ? 还有那些FIFO啥的。

    其实,他们两者不是同一维度的概念,不具有可比性,编辑资源池里面的三种计划策略可以看做是FAIR调度策略之下的三种算法模型,比如如果我在上图中选择 CapacityScheduler 调度器之后会发现编辑资源池无法操作:

    jj9

    所以总结来说,他们的关系是这样:

    jj10

    嗯,所以其实大多数时候我们的集群是工作在Fair Schedule 调度模式之下的,我们有必要知道它的三种计划策略的资源分配算法:

    Max-min fairness 算法

    此算法的核心意义是:公平分享分配给每个用户想要的可以满足的最小需求,然后将没有使用的资源均匀的分配给需要‘大资源’的用户。

    最大最小公平分配算法的形式化定义如下:

    • 资源按照需求递增的顺序进行分配
    • 不存在用户得到的资源超过自己的需求
    • 未得到满足的用户等价的分享资源

    与之对应的可执行定义:

    用户集合1, …, n分别有资源需求x1, x2, …, xn.不失一般性,令资源需求满足x1 <= x2 <= … <= xn.令服务器具有能力C. 那么,我们初始把C/n资源给需求最小的用户.这可能会超过用户1的需求,继续处理.该过程结束时,每个用户得到的没有比自己要求更多,而且,如果其需求得不到满足,得到的资源也不会比其他用户得到的最多的资源还少.我们之所以称之为最大最小公平分配是因为我们最大化了资源得不到满足的用户最小分配的资源.

    听不懂是吧,听不懂就对了,我也听不懂,上面都是从网上粘贴的定义,我们来举个实际的例子就明白了:

    示例1:

    问题:有一四个用户(A、B、C、D)的集合,资源需求分别是 4,2.6,2,5,其资源总能力为 10,为其计算最大最小公平分配。

    解决方法:我们通过几轮的计算来计算最大最小公平分配。

    首先,我们将四个用户按照资源需求从小到大排序,依次是 C(2)、B(2.6)、A(4)、D(5),然后根据此顺序为四个用户依次划分资源:

    第一轮,我们暂时将资源划分成 4 个大小为 2.5 大小。由于这超过了用户C的资源需求,这使得剩了 0.5个 均匀的分配给剩下的3个人资源,所以现在给予后面的他们三个是每个人 2.66。而这个值又超过了用户B的需求,所以我们拥有额外的 0.066… 来分配给剩下的两个用户。 但是对于剩下的两个用户来说这点资源都无法满足需求,所以将这0.066平均分给他们俩,即用户A、D 每个人得到 2.66 + 0.033 = 2.7 。

    所以最后的结果是:用户C、B 得到了他们的想要的最小资源,而用户A、D分别得到 2.7 资源。

    示例2:

    在上面的例子中,所有的用户拥有相同的权利来获取资源。但是有时候我们需要给予一些用户更大的配额。比如,我们可能会给不同的用户 1, …, 关联权重 w1, w2, …, wn,这反映了他们间的资源配额权重。

    我们通过定义带权的最大最小公平分配来扩展最大最小公平分配的概念以使其包含这样的权重:

    • 资源按照需求递增的顺序进行分配,通过权重来标准化
    • 不存在用户得到的资源超过自己的需求
    • 未得到满足的用户按照权重分享资源

    有一四个用户(A、B、C、D)的集合,资源需求分别是 4,2,10,4,权重分别是 2.5,4,0.5,1,资源总能力是16,为其计算最大最小公平分配。

    解决方法:第一步是标准化权重,将最小的权重单位设置为1(其实就相当于给每个权重乘以某个系数,让他变成整数方便计算)。这样权重集合更新为 5,8,1,2。这样我们就假装需要的资源是 5+8+1+2=16份.

    和之前不同的是,在资源分配的每一轮,我们按照权重的比例来划分资源,因此,在第一轮,我们计算 C/n 为16/16=1(基本分配单位)。在这一轮,四个用户分别获得 5×1=5,8×1=8,1×1=1,2×2=4 单元的资源,用户 A 得到了 5 个资源,但是A只需要4,所以多了1个资源,同样的,用户 B 多了 6 个资源。用户C和D资源不够。现在我们有 7 个单元的资源可以分配给用户C 和用户 D。他们的权重分别是 1 和 2 ,所以按照权重比例给予用户C和D,即额外的给用户C 7 × 1/(1+2) 单元资源和用户 D 额外的 7 × 2/(1+2)单元。这会导致用户D的配额达到了2 + 7 × 2/3 = 6.666,超过了需求,所以我们将额外的 2.666 单元给用户C,最终获得 1 + 7/3 + 2.666 = 6 单元。最终的分配是 4,2,6,4 ,这就是带权的最大最小公平分配。

    DRF 算法

    Max-min fairness 只是解决了单一资源下,多用户的公平分配。但在现代的资源管理系统中,往往不只有一种资源。比如YARN,包含CPU和内存两种资源。多种资源的情况下,如何公平分配?我们可以选用DRF算法。

    DRF 的核心概念在于让所有 application 的“主要资源占比”尽量均等。这句话怎么理解呢 ? 首先我们说什么是主要资源占比。举个例子,比如集群总共 100 CPUS,10000 GB Memorys。

    App1的每个 Container 容器需要 2 CPUs, 300 GB Memory。

    App2的每个 Container 容器需要 6 CPUs, 100 GB Memory。

    对于App1来说,他的每个 Container 需要的内存占总内存的 3%,CPU占总CPU的2%,所以它的主要资源是内存,而主要资源占比是3%。

    同理,对于App2来说,他的每个 Container 需要的内存占总内存的 1%,CPU占总CPU的6%,所以它的主要资源是CPU,主要资源占比是 6%。

    上面说的让所有 application 的“主要资源占比”尽量均等可以理解为,当我们给App1分配 X 个Container,给App2分配 Y 个Container时,需要满足:

    解第三个等式可以得到  X = 2Y,然后我们将这个结果带入上面的两个就可以得出 X=20,Y=10.  所以最后的结果是:

    App1 总共启动20个Containers,资源是 40CPUs,6000GB Memory。

    App2 总共启动10个Containers,资源是 60CPUs,1000GB Memory。

    其实到这里我们应该理解这个算法的原理了,但其实有个问题,App1 是如何确定它的每个Container需要的是 2 CPUs, 300 GB Memory 呢 ?  这个计划是谁给出的呢 ?

    这个问题先放在这里,后续深入了解了再做补充。

    如果考虑权重的话,算法会更复杂一点。另外在单一资源的情况下,DRF会退化为max-min fairness。关于此算法的进一步具体可以参考此文档:https://people.eecs.berkeley.edu/~alig/papers/drf.pdf

    Yarn 资源池管理与查看

    在了解Yarn对于资源队列的配置后,我仍然存在以下几点疑惑,而且是管理好集群必须要搞明白的几点:

    • 当一个任务启动时,Yarn 是如何预判它需要多少资源,从而分配对应的 Container 呢 ?
    • 如果某个任务在执行过程中资源不够用了怎么办 ? 会发生什么 ?
    • 如果某个 Container 在执行过程中使用超出了限制,会怎么办 ?
    • 如何查看某个已结束或者正在运行的任务的资源消耗情况 ?

    由于篇幅限制,这些问题我们在下一篇文章中继续讲述。

     

    参考

    https://stackoverflow.com/questions/39347670/explanation-of-yarns-drf#

    http://blog.sina.com.cn/s/blog_9d6887fe0102wibo.html

     

     

     

     

     

     

     

  • CDH 集群节点配置不均衡下的集群性能优化

    背景

    在实际维护Hadoop集群的工作中,我们会碰到各种问题,本次记录一个CDH集群下 DataNode 之间配置不一样时的优化方案(不要问我为什么datanode之间的配置会不一样,接手的时候就是这样,你懂的……)。

    基础信息:

    CDH版本:5.11.1

    DataNode节点配置:

    dt02 – dt05 四个节点是: 16核,32G,4T硬盘

    dt06 – dt10 五个节点是:32核64G,8T硬盘

    优化目标一: 大任务产生大的临时文件导致前四个低配磁盘节点告警问题

    我们在背景中已经知道,9个 datanode 的中的配置不一样,前四个节点的磁盘只有4T。 而我们集群的中有大任务,会产生大量的临时文件,如下:

    这时候尴尬的是临时文件导致前四个低配节点的磁盘分区不够了:

    如上,我们的前四台低配节点的 /data 分区剩余磁盘不够了(暂时忽略节点内部分区之间的数据不平衡,因为/data1分区是后加的,不要问我为什么没有rebalance,接手的时候就是这样),导致整个节点都不可用,但是剩下的5个低配节点还是正常工作的。  那么如何解决这个问题呢 ? 理想情况下我希望可以配置一个阈值,让每个分区的数据到达一定程度后就不再写了,自动写到其他分区,Hadoop集群中支持这样的配置:

    如下图,是综合调整后的结果,下面来整体解释一下:

    jj3

    首先我来解释一下 dfs.datanode.du.reserved 参数的含义,此参数表示 :适用于非分布式文件系统 (DFS) 使用的保留空间(字节/卷),通俗的说就是对于每个分区要剩余多少给非HDFS用途保留下来

    这个参数设置了以后,对于每个分区来说,比如cdh02的 /data分区,他会保留这么多磁盘空间给非hdfs用,当数据写入到达这一阈值后,自动写入其他分区。

    在上图中我们看到,我给cdh02、cdh03、cdh04、cdh05 单独设置了400G的阈值,其他节点默认为 100G,这样设置的目的是在上面四个节点中,当 /data 分区剩余小于400G空间的时候,HDFS数据不会继续写入此分区,而是写入另一个还没有到达这一阈值的 /data1 分区(这样就不会导致因为磁盘空间不够20%或者10%而导致集群剔除该节点现象)。

    也就是说这个参数可以从两个维度去设置,一个是从集群层面设置一个默认值,另一个是从每个节点的层面设置一个单独值,这样我们就可以做到为部分节点针对他的配置设置不同的阈值。

    全局维度的设置路径是(在CM界面): hdfs服务 —— 配置 —— datanode 中可以找到。

    单独节点维度的设置路径是(在CM界面):hdfs服务 ——实例 —— 选择对应主机的角色类型 —— 配置 中找到。

    这样设置了以后,当节点的 /data 分区剩余到 400G的时候,它就自动写到 /data1 分区了(此集群节点完全是可用的,CM面板既不会变黄也不会变红):

    同样的,为了消除各个节点之间的磁盘大小不一致问题,我为前四个节点新加了磁盘分区,并设置了 dfs.data.dir ,如上,我的集群一共有三种 dfs.data.dir

    不过这里有一个坑得提醒一下

    虽然我们可以通过 dfs.datanode.du.reserved 参数设置,但是仍然要保证此参数设置的阈值空间,大于分区磁盘大小的10%,这是什么原因呢 ?  是因为下面这个值:

    jj4

    没错,问题出在Yarn上。这个参数的含义是检查每个分区磁盘利用率,当磁盘空间超过90% ,nodemanager可能会被强行杀死。

    我最开始的时候,单独设置 cdh03 的 dfs.datanode.du.reserved 参数为 120G,但是发现在CM面板上此节点的状态变为红色了,一看是Yarn nodemanager报错了,具体日志如下:

    jj

    也就是说,因为我设置的120G太小,当还没有达到 120G的时候,只要达到90%的时候,nodemanager就被干掉了,此节点也就不可用了。

    所以我要么调大 dfs.datanode.du.reserved 这个值,使他大于分区的 10% 空间,要么调大yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage 这个值,使它无法触发nodemanager 自杀。

    总之这两个值要配合来调整。

    优化目标二:没有充分利用那些高配置DataNode节点的资源

    在CDH中,如果DataNode节点之间的资源不一致,那么它默认会选择较小资源节点作为统一资源配置的标准。比如在我们上面的那个例子中,Yarn 资源配置的CPU和内存是16核32G为标准的,也就是后几台 高配机器也只能被利用这么多资源:

    jj5

    如上,如果我不为高配的 CDH06 — CDH10 高配节点做单独配置,那么所有节点默认的容器虚拟CPU和内存就是16核32G。经过我这样配置之后,在Yarn的资源池可以看到可用的资源果然变大了:

    jj6

    这样整个Yarn可用的资源就提升了。

     

     

  • CDH Datanode 块数太多处理

    背景

    线上CDH 集群所属 HDFS 服务报出如下错误:

    yy

    看了一下,我们集群默认给每个DT节点的阈值是 500000:

    yy

    而我们集群目前的块使用量如下:

    yy

    如上,很明显是突增起来的,然后我们从官网上看一下关于这个监控Item的解释:

    参考地址:https://www.cloudera.com/documentation/enterprise/latest/topics/cm_ht_datanode.html

    在DataNode上有太多的块可能会影响DataNode的性能。具有大量块的DataNode需要更大的java堆,并且可能经历更长的垃圾收集暂停。此外,大量的块可能是许多小文件存在的指示器。HDFS不是为处理许多小文件而优化的,在操作许多小文件时,处理时间可能会受到影响。如果只有一部分DT节点有大量的块,那么可以通过运行HDFS再平衡命令移动数据来解决这个问题。如果HDFS balanced命令报告集群是平衡的,而不修复块不平衡,那么问题与许多小文件的存在有关。如果你不关心许多小文件,可以考虑禁用此健康测试。如果所有的DT都有大量的块,并且问题与小文件无关,那么应该添加额外的DT节点扩容。

    监控阈值表

    Property Name Description Template Name Default Value Unit
    DataNode Block Count Thresholds The health test thresholds of the number of blocks on a DataNode datanode_block_count_thresholds CDH=[[CDH 4.0.0‥CDH 4.5.0)=critical:never, warning:200000.0, [CDH 4.5.0‥CDH 6.0.0)=critical:never, warning:500000.0, [CDH 6.0.0‥CDH 7.0.0)=critical:never, warning:1000000.0] no unit

    问题处理

    从我们的现象来看,特别怀疑是有任务产生了大量的小文件造成的,于是我们进行如下查看:

    如上,我们最后定位到是其中的 rtestdb 这个库产生了大量的小文件,和业务确认是ETL任务产生的大量小文件,没有及时合并删除。

    结论是业务那边将这些数据文件删除,但是删除后发现集群还是没有恢复,原因是hdfs的垃圾回收机制:

    如上处理后,集群恢复。

    HDFS的回收站

    1. 回收站机制原理简述

    上面的处理涉及到 Hdfs 的回收站问题,既然碰到了怎么能放过,HDFS 的回收站就像 Windows 操作系统中的回收站一样。它的目的是防止你无意中删除某些东西。你可以通过设置如下属性来启用此功能(CDH中默认就是开启的):

    jj2

    属性 说明
    fs.trash.interval 分钟数,当超过这个分钟数后检查点会被删除。如果为零,回收站功能将被禁用。
    fs.trash.checkpoint.interval 检查点创建的时间间隔(单位为分钟)。其值应该小于或等于fs.trash.interval。如果为零,则将该值设置为fs.trash.interval的值。

    HDFS 会为每一个用户创建一个回收站目录:/user/用户名/.Trash/,每一个被用户通过Hdfs Shell 删除的文件或者目录,并不会真的删除(除非你加了 -skipTrash 参数),而是移动到了系统回收站里(如 /user/hdfs/.Trash/Current, 其中hdfs是操作的用户名),如果检查点已经启用,会定期使用时间戳重命名Current目录。而系统回收站都有一个周期,也就是当系统回收站中的文件或者目录在一段时间之后没有被用户用来恢复的话,HDFS就会自动的把这个文件或者目录彻底删除,之后,用户就永远也找不回这个文件或者目录了。

    在HDFS内部的具体实现就是在NameNode中开启了一个后台线程 Emptier,这个线程专门管理和监控系统回收站下面的所有文件和目录,对于已经超过生命周期(interval)的文件文件目录,这个线程就会自动的删除它们,不过这个管理的粒度很大。另外,用户也可以手动清空回收站,清空回收站的操作和删除普通的文件目录是一样的,只不过HDFS会自动检测这个文件目录是不是回收站,如果是,HDFS当然不会再把它放入用户的回收站中了。

    注1:如果用户的回收站中已经存在了用户当前删除的文件/目录,则HDFS会将这个当前被删除的文件/目录重命名,命名规则很简单就是在这个被删除的文件/目录名后面紧跟一个编号(从1开始知道没有重名为止)。

    注2:当用户写程序调用HDFS的API时,NameNode并不会把删除的文件或目录放入回收站Trash中,而是需要自己实现相关的回收站逻辑。

    2. 回收站基础操作

    2.1 清空回收站

    该命令使 NameNode 永久删除回收站中比阈值更早的文件,而不是等待下一个 empteir 窗口。它立即从文件系统中删除过期的检查点。

    2.2 回收站文件恢复

    直接使用 hadoop fs -mv 将删除文件恢复即可。

    有时你可能想要在删除文件时临时禁用回收站,也就是删除的文件或目录不用放在回收站而直接删除,在这种情况下,可以使用 -skipTrash 选项运行 rm 命令。

     

     

     

     

  • Namenode Ha切换后导致的hive不可用问题解决

    背景

    线上管理的CDH集群中,配置了 namenode HA 高可用,如下,平时一个是备用,一个是活动状态:

    yy

    今天中午,那台处于活动状态的namenode 出问题了宕掉了,导致处于备用状态的NameNode临时顶了上去(自动的,HA成功)。 虽然Ha成功了,但是有部分用户反馈Hive无法查询,报如下错误:

    Error while compiling statement: FAILED: SemanticException Unable to determine if hdfs://xxxxcdh01xxxxx:8020/user/hive/warehouse/ods.db/test_tb is encrypted: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:88) at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1835) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1450) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getEZForPath(FSNamesystem.java:9278) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getEZForPath(NameNodeRpcServer.java:1635) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getEZForPath(AuthorizationProviderProxyClientProtocol.java:928) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getEZForPath(ClientNamenodeProtocolServerSideTranslatorPB.java:1360) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2220) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2216) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2214)

    经过网上搜索,发现是Hive元数据的问题,在Hive数据库中有两张表:

    • DBS  : Hive数据仓库的总路径
    • SDS  : Hive每张表对应的路径

    在元数据库中保存了原来的hdfs的路径,修改成HA对应的别名 nameservice1 即可:

    问题解决。

     

     

  • CDH Hive 权限管理基础

    背景

    维护 Hive 过程中,遇到不少关于权限管理的问题,尤其是 在添加 Sentry 和 Kerberos 之后。 所以趁此机会,简单梳理一番,作为记录,方便后续操作和定位问题。

    在开始之前,我们应该大概了解两个概念:

    • 认证(authentication):验证用户所用的身份是否是对的
    • 授权(authorization):验证用户所用身份操作是否有权限

    顾名思义,认证就是识别你这个人是否真的,主要防止比如类似黑客入侵这种。 授权就是我们普通意义上的权限管理,检查某个既定的用户是否拥有某些权限。比如Sentry的主要作用是授权管控,而Kerberos的主要作用是认证概念,这个心理还是要有点数的。

    既然要研究Hive的权限管理,我们首先要对Hive的权限模式有个大概的了解,看下对应的官网概述,我这里直接挑重点翻译:

    可以使用三种Hive授权模式来满足不同的用例。

    首先来看两个主要用例:了解 Hive 的两个主要用例对我们解读它的授权是很有用的。
    • Hive是作为一个表存储层。主要是针对Hadoop内部服务的,比如Apache Pig、MapReduce和一些大型并行处理数据库(Cloudera Impala、Facebook Presto、Spark SQL等)。在这种情况下,Hive可以作为中间层为给这些组件提供底层HDFS的表抽象和元数据。这些组件本身可以直接访问HDFS和 metastore 服务器(它为元数据访问提供了一个API)。访问HDFS的话是通过使用HDFS权限授权控制访问,元数据的访问需要使用Hive配置进行授权。
    • Hive 作为SQL查询引擎给用户提供访问,这是Hive最常见的使用情况之一,这个用例有以下两个子类别:

    a. Hive 命令行用户,这些用户可以直接访问Hive的数据及其元数据(不过后续Beeline很快就会取代hive命令行)

    b. ODBC/JDBC和其他HiveServer2 API用户(Beeline CLI就是一个例子)。这些用户通过HiveServer2进行所有数据/元数据访问。

    Hive 授权模式:针对上述不听的用例,可以选择不同的授权模式

    1. 元数据服务器中基于存储的授权(Storage Based Authorization in the Metastore Server)

        主要针对的是Case1 和 Case2a 用例,Hive 对应的HDFS权限作为表存储访问的一个主要判断依据凭证。我的理解是:其实就是根据hive对应的底层的hdfs文件和目录的权限管理来控制其他用户的访问权限,当然可以使用HDFS ACL 实行更丰富饱满的权限控制。

    官网地址:可以在这个地址配置开启这一授权模式: Storage Based Authorization in the Metastore Server

    官网地址:关于HDFS ACL 相关可以参考: HDFS ACL

    2. 在HiveServer2中基于SQL标准的授权(SQL Standards Based Authorization in HiveServer2)

    虽然基于存储的授权可以在数据库、表和分区级别上提供访问控制,但它不能在更细的级别(如列和视图)上控制授权,因为文件系统提供的访问控制位于目录和文件级别。细粒度访问控制的先决条件是数据服务器能够提供用户需要(或拥有)访问的列和行。HiveServer2满足这个条件,因为它有一个理解行和列(通过使用SQL)的API,并且能够只提供SQL查询要求的列和行。    基于SQL标准的授权(在Hive 0.13.0, Hive -5837中引入)可以用来支持细粒度的访问控制。它基于授权的SQL标准,并使用熟悉的grant/revoke语句控制访问。它需要通过HiveServer2配置启用。

    官网地址:可以参考这个地址配置此模式:SQL Standards Based Authorization

    3. 用Ranger或者Sentry授权模块( Authorization using Apache Ranger & Sentry)

    就是用第三个授权模块来进行授权管理。

    4. 老的默认的Hive授权模式(在Hive2.0.0之前

    这种模式没有完整的访问控制模型,因此有许多安全漏洞没有得到解决。例如,没有定义为用户授予特权所需的权限,即任何用户都可以授予自己对表或数据库的访问权限。该模型类似于基于SQL标准的授权模式,因为它提供基于grant/revoke 基于状态的访问控制。但是,访问控制策略不同于基于SQL标准的授权,它们不兼容。对于Hive命令行用户,也支持使用这种模式。但是,由于在基于SQL标准的授权(上面)讨论中提到的原因,它不是Hive命令行的安全授权模式。

    官网地址:可以参考这个地址配置此模式:Hive Old Default Authorization

    说实话Hive权限方面的官方文档看起来有点吃力,感觉文档本身就不是很明晰,有点参差不齐的感觉。 所以上面梳理的不一定都对,只是建立在个人的梳理之上的,如有错误,欢迎指正。

    上面的内容我们知道Hive大概有四种授权模式,且主要对应于两种典型场景,接下来就对此进行一番实践和探索。

    我们首先从一个简单且空白的CDH Hive入手,如下:

    原生的 CDH Hive 权限管控

    当我们最开始的时候从CM端安装好一个Hive之后,已经可以初步使用它了。 只不过默认情况下它并没有开启权限管理,这样所有的用户都具有相同的权限,同时也是超级管理员,也就对hive中的所有表都有查看和改动的权利,但是这样是不符合一般数据仓库的安全原则的,官网是这样说的:

    yy

    它建议我们将 hive 的 warehouse 目录设置为 1777 的属性,这样可以允许所有用户创建和访问,但是不允许他们删除别人的数据。

    我们来验证一下:

    如上,我们看到hive默认的文件目录 warehouse 是 drwxrwxrwt 的,这个意思是所有人都对这个目录有写权限,但是不能删除别人的目录或文件。

    然后我们新建一个账户,不做任何权限分配就去访问hive :

    1. 在集群的hive metastore 角色的机器上创建一个名为james的用户:

    2. 通过hue界面新建一个用户,注意勾选下面的选项,这样就会自动在hdfs的 /user/ 下给这个用户新建一个家目录:

    yy

    如上,然后我在 gateway 客户端服务器上也新建一个james用户,并切换到这个用户下,进行hive的直接访问:

    如上是可以成功的,接下来我们来配置权限管理来看看。

    这里我们先采用背景中介绍的Hive 2.0之前默认的权限管理方式(模式4),以官网为准,看看:

    yy

    需要在 hive-site.xml 下添加参数配置,我们来照做一下,在CDH的hive配置里面将下面三个的hive-site文件都配置:

    • Hive 客户端高级配置代码段(安全阀)的 hive-site.xml
    • HiveServer2 高级配置代码段(安全阀)的hive-site.xml
    • hive metastore 高级配置代码段(安全阀)的hive-site.xml

    进行配置:

    yy

    我把里面的配置贴出来:

    重启hive,然后再来看看:

    发现我不管用哪个账户登录hive都没法随意的访问表了,而且比较坑的是即使这个表之前是我创建的也不行了

    我尝试着去创建角色授权,发现也不行:

    网上搜了一下,报这个错的话是因为我们还需要配置下面这个:

    按照上面的在三个site文件里面加上这个配置后重启hive,重新执行发现还是报那个错,额,最后在hive的命令行执行:

    发现果然可以授权了:

    但是,和官网中说的一样,这种授权模式的问题就在于,这时候随便一个用户都可以授权,如下:

    网上搜了一下,说这个问题可以解决,即通过代码限定一个超级管理员。 规定只有管理员才可以进行授权操作。但受限制于本人捉急的Java水平这里就不再尝试此操作了,感兴趣的可以随便在网上搜一下这个话题。我这里随便挑两个链接供大家参考:

    https://www.jianshu.com/p/e5b80c3e7269

    https://blog.csdn.net/zhaomeng1123/article/details/49024929

    嗯,既然这种授权模式不太好用的话,我们是不是可以换种模式再试试呢 ?

    我们来试试背景中的模式2,即使用基于Hiveserver2的SQL标准授权,我们参考对应的官网来配置一下:因为这块的英文很直白,都是既定的操作步骤,所以我直接粘贴过来:

    For Hive 0.14 and Newer

    Set the following in hive-site.xml:

    • hive.server2.enable.doAs to false.
    • hive.users.in.admin.role to the list of comma-separated users who need to be added to admin role. Note that a user who belongs to the admin role needs to run the “set role” command before getting the privileges of the admin role, as this role is not in current roles by default.
    • Add org.apache.hadoop.hive.ql.security.authorization.MetaStoreAuthzAPIAuthorizerEmbedOnly to hive.security.metastore.authorization.manager. (It takes a comma separated list, so you can add it along with StorageBasedAuthorization parameter, if you want to enable that as well).
      This setting disallows any of the authorization api calls to be invoked in a remote metastore. HiveServer2 can be configured to use embedded metastore, and that will allow it to invoke metastore authorization api. Hive cli and any other remote metastore users would be denied authorization when they try to make authorization api calls. This restricts the authorization api to privileged HiveServer2 process. You should also ensure that the metastore rdbms access is restricted to the metastore server and hiverserver2.
    • hive.security.authorization.manager to org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory. This will ensure that any table or views created by hive-cli have default privileges granted for the owner.

    Set the following in hiveserver2-site.xml:

    • hive.security.authorization.manager=org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory
    • hive.security.authorization.enabled=true
    • hive.security.authenticator.manager=org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator
    • hive.metastore.uris=’ ‘

    在配置的时候发现一个问题,hive-site.xml 可以轻松在CM管理界面就配置完毕,但是上述的 hiveserver2-site.xml 这个文件本身不存在,CDH的CM控制页面里面也没有这一可配置项,于是我只能尝试着加在hiveserver2的配置目录底下新建该文件,然后重启hive。 但是发现这样的话,beeline 无法连接hiveserver2,最后经过几番测试,发现在 hive-site.xml 这个文件下加一条 那个在 hiveserver2-site.xml里面的一个配置项(至于这个hiveserver2-site.xml有没有起作用我感觉并不是很确定):

    执行完之后,我们来测试一下:

    但是,比较坑的是,我这样配置重启hive以后,发现所有的用户对所有库表都具有读写权限,那这样我的权限分配还有什么意义 ?

    开始我怀疑是public角色默认拥有了所有已存在的库权限,但是测试用admin角色新建库之后,其他所有的用户(默认的public角色)还是都可以访问。 我尝试删除public重建,但提示我 admin和 public角色不能被删除。

    然后我怀疑是不是因为目录是1777 的问题,于是把某个 db对应的目录以及其下面的表对应的文件权限都改为 0751,然后用一个public角色的用户去访问该表,结果仍然还是可以通过。 这就很挫了,到这里有点束手无策的意思。

    我仔细想了下,觉得还是跟我的配置有关系,最后无意中试了一下将 hive-site.xml 里面的:

    hive.security.authorization.manager 参数改为 文档里面说的本来属于 hiveserver2-site.xml 的 org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory 这个值,重启后发现有效果了。 结果和期待中的一样了。

    到目前为止,我通过CM界面添加的 hive-site.xml 配置项如下:

    Gateway 和 Hivemeta store Server对应的 hive-site.xml 文件的内容如下:

    HiveServer2 对应的 hive-site.xml文件在上面的基础上再多添加一项:

    测试结果如下:

    权限管理终于起作用了。紧接着我们试着用admin 用户分配权限给这个james 用户,让他可以读这个表试试: