Hadoop基础知识之——MapReduce

一、背景

在上一篇文章《Hadoop基础知识之——数据!数据!》一文中,已经交代了Hadoop 诞生的背景和意义,这一篇正式开始接触 Hadoop基本知识,当然从 MapReduce 开始讲起。

目前 Hadoop官网上最近版本已经到 3.x 了:

1

二、环境准备

工欲善其事,必先利其器。  在学习 Hadoop 的时候,我们必然会涉及到很多 Java 的代码和知识。  所以为了方便,这里先简单列举如何准备 Java 的语言环境和代码编辑器。

2.1、安装 eclipse 编辑器

Eclipse是由IBM开发并捐赠给开源社区的一个IDE,也是目前应用最广泛的IDE。Eclipse的特点是它本身是Java开发的,并且基于插件结构,即使是对Java开发的支持也是通过插件JDT实现的。

 

下载过程这里不再详细赘述,免费的,很简单,参考: https://www.liaoxuefeng.com/wiki/1252599548343744/1255883818398144

2.2、本地安装 Java

Java SE JDK 安装也非常简单,这里不再详细赘述,参考: https://www.liaoxuefeng.com/wiki/1252599548343744/1280507291631649

2.3、安装 Maven

Maven就是是专门为Java项目打造的管理和构建工具,它的主要功能有:

  • 提供了一套标准化的项目结构;
  • 提供了一套标准化的构建流程(编译,测试,打包,发布……);
  • 提供了一套依赖管理机制。

我们肯定会在 本地 Eclipse 中创建 Java 项目来测试,所以需要预先准备好 maven 环境。

Maven 的安装也非常简单,这里不再详细赘述,参考: https://www.liaoxuefeng.com/wiki/1252599548343744/1309301146648610

2.4、在 Eclipse 中配置使用 Maven 进行项目管理

首先,确保本地 MAC 环境已经安装好 Java 和 Maven 环境:

然后,在 Eclipse 中配置 Maven 相关环境:

打开 Eclipse–>Prefrences–>Maven–>Installations , 点击右侧的Add按钮,在弹出的窗口中,选择 2.3下载解压好的文件夹,如图:

2

然后你就把maven装在你的Eclipse中了,但是我们知道,maven是一个方便我们管理jar包的工具,我们需要用到的jar包都是从maven的中央远程仓库里下载的,但是我们不希望每次都去本地仓库里下载,当我们下载过一次之后就可以在我们的本地仓库中导入jar包,那么,怎么连接本地仓库呢?

新建一个文件夹,作为本地仓库,比如这里我新建了一个 maven_repo的文件夹:

然后找到2.3 中下载解压后的maven文件夹,里面有conf子文件夹,找到settings.xml, 编辑加入一行配置:

回到Eclipse,打开Preferences —>Maven—->User Settings:

第一个框填的是下载 settings.xml 文件路径。

3

第二个框填的是刚才的本地仓库路径。

 

三、MapReduce

3.1、什么是 MapReduce

宏观上来看:MapReduce 是一种数据处理的编程模型。 它利用分布式的文件系统,将任务在不同的机器上计算,然后统计聚合计算结果返回。 总体上采取了分而治之与迭代的思想,让计算能力在一定程度上水平无限扩展,从而应对复杂、庞大数据量的计算。

MapReduce 编程模型将任务过程分为两个处理阶段: map 阶段 和 reduce 阶段。

每阶段都以键值对作为输入和输出,类型由自己定义。

我们只需要完成两个函数: map 函数 和 reduce 函数即可,然后 MR 框架会自动帮我们去分布式的执行这两个函数。

3.2、以例说明

最典型的例子就是 wordcount 词频统计了,即读取某个文本文件并计算其中单词出现的频率。输入是文本文件,输出是文本文件。 输出的内容是每行包含一个单词和它出现的次数,用制表符分隔。

当我们安装好Hadoop 环境后,像这种 WordCount 基本测试用例,都是官方自带的:

当然这个是源码文件,代码比较简洁,但是想要直接运行还需要编译。

而Hadoop本身将这种 通用型测试 都打包在了 hadoop-examples.jar 包里面了,wordcount 就包含在里面。 我们可以直接运行来看这里面都包含哪些测试程序:

如上,这里面包含了 wordcount 程序,可以直接运行。

执行完之后,我们发现 output 如下:

额,这里竟然生成 1301个 文件,这不是典型的小文件问题吗 ? 这里记一笔,容后再看为啥会这样。

我们先看结果:

如上,他确实完成了词频统计,但是这个 java 代码不是很好理解,不接地气。

趁此机会,刚好要学习 java ,就摸一下从 本地 eclipse 开发环境编写wordcount 项目,并上传至 Kerberos 的 CDH 环境运行。

3.3. 实践探索,自己实现 wordcount

首先,我们来创建项目目录结构。 一个使用Maven管理的普通的Java项目,它的目录结构默认如下:

项目的根目录 a-maven-project 是项目名,它有一个项目描述文件pom.xml,存放Java源码的目录是src/main/java,存放资源文件的目录是src/main/resources,存放测试源码的目录是src/test/java,存放测试资源的目录是src/test/resources,最后,所有编译、打包生成的文件都放在target目录里。这些就是一个Maven项目的标准目录结构。

所有的目录结构都是约定好的标准结构。使用标准结构不需要做任何配置,Maven就可以正常使用。

1

如上,我们创建好目录排版后,创建了三个 代码文件,一个map函数类、一个reduce函数类、一个运行代码类。

WordCountMapper类:

如上,我们自定义了 名为 WordCountMapper 的类,继承自 Mapper 类。 这个 Mapper 类是一个抽象类(什么是抽象类?),它有四个形参类型,分别指定 map 函数的输入键、输入值、输出键、输出值的类型:  <LongWritable, Text, Text, LongWritable>

Hadoop 本身提供了一套可优化网络序列化传输的基本类型,而不直接使用 Java 内嵌的类型。 这些类型都在 org.apache.hadoop.io 包中。 这里使用 LongWritable 类型(相当于java的long长整型)、Text类型(相当于java的String类型)和 IntWritable 类型(相当于 java 的Integer类型)。

map() 方法的输入是一个键一个值。 我们在代码中先将 一行输入的 Text 转换为 Java 的String 类型,然后用 split 切割放入数组。 map() 函数还提供 Context 实例用于输出内容的写入。我们将 word 和 1 搭配写入 context 。

WordCountReducer类:

同样,我们自定义了 WordCountReducer类,继承自 Reduce类,它同样四个形参。reduce 函数的输入类型必须匹配map函数的输出类型。

看到这里可能会有疑问,代码中只是将 count 进行累加了,并没有判断 key 是否相同,不是应该将 相同key 值的count 进行累加吗 ?

其实这里我们少了一个 shuffle 环节,即清洗。  shuffle 环节处于 map 之后, reduce 之前。

WordCount 类:

如上,Job 对象指定作业执行规范。 我们可以用它来控制整个作业的运行。我们在代码中构造 Job 对象之后,需要指定输入和输出数据的路径。 调用 FileInputFormat 类的静态方法 addInputPath() 来定义输入数据的路径,这个路径可以是单个文件、一个目录(将目录下的所有文件当作输入)。

调用 FileOutputFormat 类中的静态方法,setOutputPath() 来指定输出路径。 这个方法指定的是 reduce 函数输出文件的写入目录。在运行作业前,该目录是不应该存在的。

通过 setMapperClass() 和 setReducerClass() 方法指定要用的 map 类方法类型和 reduce 类方法类型。

setOutputKeyClass() 和 setOutputvalueClass() 方法控制 reduce 函数的输出类型,并且必须和 reduce 类产生的相匹配。  map 函数的输出类型默认和 reduce 是相同的,因此没有额外专门指定。但是如果不同,则需要用 setMapOutputKeyClass() 和  setMapOutputVlaueClass() 来指定。

最后调用 job 中的 waitForCompletion() 方法来提交作业并等待执行完成。该方法返回 bool 值,表示 true 成功  或者 flase 失败。

在 Hadoop 集群上运行整个作业时,要把代码打包成一个 Jar 包。

我们知道 Maven 是通过 Pom 文件来识别并编译管理相关依赖的,我们的程序依赖 hadoop-common 和 hadoop-client 等包,可以在这里指定,我并不需要额外去下载这些包,指定了  https://repository.cloudera.com/artifactory/cloudera-repos 仓库,它会自动帮我去找相关的依赖包。

pom.xml :

写好pom文件后,我们就可以开始准备生成 jar 包了:  项目右键选择 debug as,然后进入如下操作:

Goals 填写: clean compile package

5

完成后将自动生成 target 目录:

7

我们看到 target 目录下已经生成我们想要的 WordCount-1.0-SNAPSHOT.jar 包了,我们将这个包传到客户端服务器。

jar 包需要放在本地客户端服务器:

从上面的结果信息中,我们可以看到一些有价值的东西:

这个作业的标识 是 job_1602495843334_9212309,执行了 2个 map 和 1300 个 reduce 任务。Counters 部分显示了一些统计信息,我们可以看到:

10 个 map 输入记录(两个文件一共有10行),产生 46 个 map 输出记录。随后,这 46 个记录,分成 38 组 reduce 输入记录,产生 38 个reduce 输出记录。

我们看到输出目录下1300 个 part-r-xxxx 文件大部分都是空的(1300 个reduce),大小为0,而不为空的刚好有 38-1 个:


当运行完成后,可以看到结果如下:

3.3 概念扩展 与 加深理解

通过上面的实践,我们已经和可以自己写一个最简单的 Maven 项目,并传到集群,去执行获取结果。  但是我们对集群上任务本身的运行原理还不够了解,是时候停下来扩展一下概念,加深理解。

3.3.1 MR 的任务数据流

我们提交一个 MR 作业(Job 或者 Application) 到集群上之后,Hadoop 将作业分成了 两类 任务 Map 和 Reduce。 这些任务运行在集群的节点上,通过 Yarn 去调度。 如果一个任务失败,它将在另一个节点上自动重新调度运行。

在这个过程中,Hadoop 将 MR 的输入数据划分成 等长的小数据块,称为输入分片(input split)。Hadoop 为每个分片构建一个 map 任务,并由该任务来运行用户自定义的 map 函数从而处理分片中的每条记录。

那么可想而知,理论上分片越多,map 越多,并发越高就会更快。 但是另一方面,如果分片切的越小,那么管理分片的总时间和构建map的任务的总时间将决定做作业的整个执行时间。

  • 那么分片取多少最合适呢? 对于大多数作业来说,合理的分片大小趋向于 HDFS 的一个块的大小,默认为 128MB。

我们知道,Hadoop 在存储有输入数据(HDFS中的数据)的本节点上运行 map 任务,可以获得最佳性能,因为它无需使用宝贵的集群带宽资源。 这就是所谓的 “数据本地化优化”。   在实际的集群运行过程中,有时候对一个map任务的 输入分片来说,存储该分片的 HDFS Block 的所有Daatanode 节点可能刚好被正在运行的其他任务占满了。  所以这时候就要从某一个数据块所在的机架中的一个节点上寻找一个空闲的 map 槽来运行该map任务(甚至在某些非常特殊的情况下,会使用其他机架的节点来运行该map任务,这将导致机架之间的网络传输)。

假如当一个 分片跨越两个数据块,那么对于任何一个 HDFS 节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到map任务运行的节点。 与本 节点运行相比,这种方法显然效率更低。

  • map 任务的输出写入本地磁盘,而不是 HDFS,为什么呢 ? 

因为map任务的输出是中间结果,该中间结果由 reduce 任务处理后才产生最终输出结果,而且一旦作业完成,map 任务的输出结果就可以删除。因此,如果把它存储在 HDFS中并且实现三副本,难免有些小题大做。 如果运行 map 任务的节点在将 map中间结果传送到 reduce 之前失败,Hadoop 将会在另一个节点上重新运行这个map任务以再次构建map 中间结果。

  • reduce 任务的输出写入 HDFS,又是为什么呢 ? 

reduce 任务并不具备数据本地化的优势,单个reduce 任务的输入通常来自于多个 map 任务的输出。 因此,排过序的map输出需要通过网络传输发送到运行 reduce 任务的节点。 数据在reduce 端合并,然后由用户定义的 reduce 函数处理。

对于 reduce 输出的每个 HDFS 块,第一个副本存储在本地节点上,其他副本处于可靠性考虑存储在其他机架的节点中。

因此,将reduce 的输出写入hdfs确实需要占据一定的集群带宽,但这与正常的 hdfs 管线写入的消耗一样,可以接受。

  • map 任务的数量往往是由输入数据的大小 输入分片决定的,那 reduce 的数量呢 ? 

reduce 任务的数量并非由输入数据的大小决定,是可以独立指定的。

如果有好多个 reduce 任务,每个map 任务就会针对输出进行分区(partition),为每个 reduce 任务建一个分区。 每个分区有许多键值,但相同键对应的键值对记录都在同一分区中。

分区可由用户定义的分区函数控制,但通常用默认的 parttitioner 通过哈希函数来分区,很高效。

多个map,一个reduce 图示(图片摘自网上):

1

多个map,多个reduce 图示(图片摘自网上):

2

如上,从这里我们可以更好的理解 shuffle 的作用,顾名思义,洗牌 就是 合并归类的作用。

当然,当数据处理可以完全并行,不需要 shuffle时,可能会出现 无 reduce 任务的情况。 比如 我们用 distcp 进行数据迁移拷贝时。

 

3.4 用 Python 实现 WordCount
3.4.1 Hadoop Streaming

尽管 Hadoop 框架是用Java编写的,但是跑 Hadoop 的程序不一定用Java,也可以用其他语言开发,比如 Python 或 c++等。 Hadoop 提供了 MapReduce 的 API,允许你使用 非 Java 的其他语言来写自己的map 和 reduce 函数。

Hadoop Streaming 使用 Unix 标准流作为 Hadoop 和应用程序之间的接口,所以我们可以使用任何编程语言通过标准输入/输出 来写 MapReduce 程序。

Streaming 天生适合用于文本处理。map 的输入数据通过标准输入流传递给 map 函数,并且是 一行一行的传输,最后将结果行写到标准输出。

map 输出的键值对是以制表符分割的行,reduce 函数的输入格式与之相同并通过标准输入流进行传输。 reduce 函数从标准输入流中读取输入行,该输入已由 Hadoop 框架根据键值排过序,最后将结果写入标准输出。

3.4.2 学习 官网的Python 实现 WordCount

我们先来看一下官网中的 Python跑MR案例

2

如上,按照这个文档说明的话,貌似必须使用Jython将Python代码转换成Java jar文件。显然,这不是很方便,如果我们用Jython没有提供的Python特性,甚至可能会产生问题。 Hadoop 自带的Python Wordcount 目录默认在:

但这个代码感觉不是很友好,就是一点都不 Python:

这里面采用了大量 org.apahce.hadoop 的包与类,乍一看不好理解,我们用自己的方式去实现一个:

3.4.3 自己构造 Python WordCount 程序

根据 3.4.1 我们知道 hadoop streaming 可以很好的处理并被调用,我们的程序目的与标准:

与传统的 wordcount 一样,我们为单词计数,即读取某个文本文件并计算其中单词出现的频率。输入是文本文件,输出是文本文件。 输出的内容是每行包含一个单词和它出现的次数,用制表符分隔。

我们先写一个 mapper 脚本,它将从STDIN读取数据,将其拆分为单词,并输出一个行列表,将单词与其计数映射到STDOUT。

但是,Map脚本不会计算单词出现次数的和。它将立即输出(word ,1)这样的元组。

然后,我们实现 reducer函数, 它将从STDIN读取mapper.py的结果,并将出现的每个单词和最终的计数相加,然后将其结果输出到STDOUT。

如上,需要注意的是,这个代码中,要求 reduce 的输入是按照 word 排序过的,即相同的 word 行是挨着进来的,否则会出错。

我们可以测试一下:

mapper 将一行输入映射成了 word  1 这样的元组。

如果我们直接接上 reduce 则会出错,如下,wang 这个单词被分开统计了:

需要,在中间加一个 shuffle 环节,我们用 sort 来模拟:

本次测试通过后,我们在 hadoop 集群上去测试运行我们的代码。

我们先搞一些测试数据来,从这三个地方分别下载三份测试数据:

将此三个文件移动到 hdfs 集群的目录下:

然后我们利用 hadoop-streaming 包来执行:

输出结果如下:

 

 

 

参考

Writing An Hadoop MapReduce Program In Python

 

 

 

 

 

发表评论