一、背景
在上一篇文章《Hadoop基础知识之——数据!数据!》一文中,已经交代了Hadoop 诞生的背景和意义,这一篇正式开始接触 Hadoop基本知识,当然从 MapReduce 开始讲起。
目前 Hadoop官网上最近版本已经到 3.x 了:
二、环境准备
工欲善其事,必先利其器。 在学习 Hadoop 的时候,我们必然会涉及到很多 Java 的代码和知识。 所以为了方便,这里先简单列举如何准备 Java 的语言环境和代码编辑器。
2.1、安装 eclipse 编辑器
Eclipse是由IBM开发并捐赠给开源社区的一个IDE,也是目前应用最广泛的IDE。Eclipse的特点是它本身是Java开发的,并且基于插件结构,即使是对Java开发的支持也是通过插件JDT实现的。
下载过程这里不再详细赘述,免费的,很简单,参考: https://www.liaoxuefeng.com/wiki/1252599548343744/1255883818398144
2.2、本地安装 Java
Java SE JDK 安装也非常简单,这里不再详细赘述,参考: https://www.liaoxuefeng.com/wiki/1252599548343744/1280507291631649
2.3、安装 Maven
Maven就是是专门为Java项目打造的管理和构建工具,它的主要功能有:
- 提供了一套标准化的项目结构;
- 提供了一套标准化的构建流程(编译,测试,打包,发布……);
- 提供了一套依赖管理机制。
我们肯定会在 本地 Eclipse 中创建 Java 项目来测试,所以需要预先准备好 maven 环境。
Maven 的安装也非常简单,这里不再详细赘述,参考: https://www.liaoxuefeng.com/wiki/1252599548343744/1309301146648610
2.4、在 Eclipse 中配置使用 Maven 进行项目管理
首先,确保本地 MAC 环境已经安装好 Java 和 Maven 环境:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
java -version java version "15.0.1" 2020-10-20 Java(TM) SE Runtime Environment (build 15.0.1+9-18) Java HotSpot(TM) 64-Bit Server VM (build 15.0.1+9-18, mixed mode, sharing) mvn -v Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f) Maven home: /Users/wangxinyu/Documents/eclipse-workspace/maven_env/apache-maven-3.6.3 Java version: 15.0.1, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk-15.0.1.jdk/Contents/Home Default locale: zh_CN_#Hans, platform encoding: UTF-8 OS name: "mac os x", version: "10.14.6", arch: "x86_64", family: "mac" |
然后,在 Eclipse 中配置 Maven 相关环境:
打开 Eclipse–>Prefrences–>Maven–>Installations , 点击右侧的Add按钮,在弹出的窗口中,选择 2.3下载解压好的文件夹,如图:
然后你就把maven装在你的Eclipse中了,但是我们知道,maven是一个方便我们管理jar包的工具,我们需要用到的jar包都是从maven的中央远程仓库里下载的,但是我们不希望每次都去本地仓库里下载,当我们下载过一次之后就可以在我们的本地仓库中导入jar包,那么,怎么连接本地仓库呢?
新建一个文件夹,作为本地仓库,比如这里我新建了一个 maven_repo的文件夹:
1 2 |
wangxinyu$ pwd /Users/wangxinyu/Documents/eclipse-workspace |
然后找到2.3 中下载解压后的maven文件夹,里面有conf子文件夹,找到settings.xml, 编辑加入一行配置:
1 |
<localRepository>/Users/wangxinyu/Documents/eclipse-workspace</localRepository> |
回到Eclipse,打开Preferences —>Maven—->User Settings:
第一个框填的是下载 settings.xml 文件路径。
第二个框填的是刚才的本地仓库路径。
三、MapReduce
3.1、什么是 MapReduce
宏观上来看:MapReduce 是一种数据处理的编程模型。 它利用分布式的文件系统,将任务在不同的机器上计算,然后统计聚合计算结果返回。 总体上采取了分而治之与迭代的思想,让计算能力在一定程度上水平无限扩展,从而应对复杂、庞大数据量的计算。
MapReduce 编程模型将任务过程分为两个处理阶段: map 阶段 和 reduce 阶段。
每阶段都以键值对作为输入和输出,类型由自己定义。
我们只需要完成两个函数: map 函数 和 reduce 函数即可,然后 MR 框架会自动帮我们去分布式的执行这两个函数。
3.2、以例说明
最典型的例子就是 wordcount 词频统计了,即读取某个文本文件并计算其中单词出现的频率。输入是文本文件,输出是文本文件。 输出的内容是每行包含一个单词和它出现的次数,用制表符分隔。
当我们安装好Hadoop 环境后,像这种 WordCount 基本测试用例,都是官方自带的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
cat /software/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/share/doc/hadoop-0.20-mapreduce/examples/src/org/apache/hadoop/examples/WordCount.java package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
当然这个是源码文件,代码比较简洁,但是想要直接运行还需要编译。
而Hadoop本身将这种 通用型测试 都打包在了 hadoop-examples.jar 包里面了,wordcount 就包含在里面。 我们可以直接运行来看这里面都包含哪些测试程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
$ hadoop jar /software/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/jars/hadoop-examples.jar An example program must be given as the first argument. Valid program names are: aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files. aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files. bbp: A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi. dbcount: An example job that count the pageview counts from a database. distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi. grep: A map/reduce program that counts the matches of a regex in the input. join: A job that effects a join over sorted, equally partitioned datasets multifilewc: A job that counts words from several files. pentomino: A map/reduce tile laying program to find solutions to pentomino problems. pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method. randomtextwriter: A map/reduce program that writes 10GB of random textual data per node. randomwriter: A map/reduce program that writes 10GB of random data per node. secondarysort: An example defining a secondary sort to the reduce. sort: A map/reduce program that sorts the data written by the random writer. sudoku: A sudoku solver. teragen: Generate data for the terasort terasort: Run the terasort teravalidate: Checking results of terasort wordcount: A map/reduce program that counts the words in the input files. wordmean: A map/reduce program that counts the average length of the words in the input files. wordmedian: A map/reduce program that counts the median length of the words in the input files. wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files. |
如上,这里面包含了 wordcount 程序,可以直接运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# 创建 测试文件 [root@aly-bigdata-hadoop-op-manager01 example]# cat word1.txt Hadoop is the Elephant King! A yellow and elegant thing. He never forgets Useful data, or lets An extraneous element cling! [root@aly-bigdata-hadoop-op-manager01 example]# [root@aly-bigdata-hadoop-op-manager01 example]# cat word2.txt Hadoop is an elegant fellow. An elephant gentle and mellow. He never gets mad, Or does anything bad, Because, at his core, he is yellow. # 将测试文件传入 hdfs hadoop fs -put word* /tmp/op_test/xinyu/wordcount/input # 测试运行 wordcount hadoop jar /software/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/jars/hadoop-examples.jar wordcount -Dmapred.job.queue.name=root.op_center.op_manager /tmp/op_test/xinyu/wordcount/input /tmp/op_test/xinyu/wordcount/output 省略..... 21/01/01 23:32:02 INFO mapreduce.Job: Running job: job_1602495843334_9059131 21/01/01 23:32:09 INFO mapreduce.Job: Job job_1602495843334_9059131 running in uber mode : false 21/01/01 23:32:09 INFO mapreduce.Job: map 0% reduce 0% 21/01/01 23:32:17 INFO mapreduce.Job: map 50% reduce 0% 21/01/01 23:32:18 INFO mapreduce.Job: map 100% reduce 0% 21/01/01 23:32:31 INFO mapreduce.Job: map 100% reduce 9% 21/01/01 23:32:32 INFO mapreduce.Job: map 100% reduce 13% 21/01/01 23:32:36 INFO mapreduce.Job: map 100% reduce 23% 21/01/01 23:32:37 INFO mapreduce.Job: map 100% reduce 26% 21/01/01 23:32:38 INFO mapreduce.Job: map 100% reduce 27% 21/01/01 23:32:39 INFO mapreduce.Job: map 100% reduce 28% 21/01/01 23:32:40 INFO mapreduce.Job: map 100% reduce 33% 21/01/01 23:32:41 INFO mapreduce.Job: map 100% reduce 37% 21/01/01 23:32:42 INFO mapreduce.Job: map 100% reduce 40% 21/01/01 23:32:43 INFO mapreduce.Job: map 100% reduce 42% 21/01/01 23:32:44 INFO mapreduce.Job: map 100% reduce 47% 21/01/01 23:32:45 INFO mapreduce.Job: map 100% reduce 50% 21/01/01 23:32:46 INFO mapreduce.Job: map 100% reduce 53% 21/01/01 23:32:47 INFO mapreduce.Job: map 100% reduce 54% 21/01/01 23:32:48 INFO mapreduce.Job: map 100% reduce 59% 21/01/01 23:32:49 INFO mapreduce.Job: map 100% reduce 63% 21/01/01 23:32:50 INFO mapreduce.Job: map 100% reduce 65% 21/01/01 23:32:51 INFO mapreduce.Job: map 100% reduce 69% 21/01/01 23:32:52 INFO mapreduce.Job: map 100% reduce 72% 21/01/01 23:32:53 INFO mapreduce.Job: map 100% reduce 76% 21/01/01 23:32:54 INFO mapreduce.Job: map 100% reduce 79% 21/01/01 23:32:55 INFO mapreduce.Job: map 100% reduce 81% 21/01/01 23:32:56 INFO mapreduce.Job: map 100% reduce 86% 21/01/01 23:32:57 INFO mapreduce.Job: map 100% reduce 90% 21/01/01 23:32:58 INFO mapreduce.Job: map 100% reduce 92% 21/01/01 23:32:59 INFO mapreduce.Job: map 100% reduce 94% 21/01/01 23:33:00 INFO mapreduce.Job: map 100% reduce 98% 21/01/01 23:33:01 INFO mapreduce.Job: map 100% reduce 100% 21/01/01 23:33:06 INFO mapreduce.Job: Job job_1602495843334_9059131 completed successfully 21/01/01 23:33:06 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=26518 FILE: Number of bytes written=205147172 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=521 HDFS: Number of bytes written=300 HDFS: Number of read operations=3906 HDFS: Number of large read operations=0 HDFS: Number of write operations=2600 Job Counters Launched map tasks=2 Launched reduce tasks=1300 Other local map tasks=2 Total time spent by all maps in occupied slots (ms)=47216 Total time spent by all reduces in occupied slots (ms)=20267128 Total time spent by all map tasks (ms)=11804 Total time spent by all reduce tasks (ms)=5066782 Total vcore-milliseconds taken by all map tasks=11804 Total vcore-milliseconds taken by all reduce tasks=5066782 Total megabyte-milliseconds taken by all map tasks=48349184 Total megabyte-milliseconds taken by all reduce tasks=20753539072 Map-Reduce Framework Map input records=10 Map output records=46 Map output bytes=445 Map output materialized bytes=42128 Input split bytes=260 Combine input records=46 Combine output records=45 Reduce input groups=38 Reduce shuffle bytes=42128 Reduce input records=45 Reduce output records=38 Spilled Records=90 Shuffled Maps =2600 Failed Shuffles=0 Merged Map outputs=2600 GC time elapsed (ms)=138364 CPU time spent (ms)=1324540 Physical memory (bytes) snapshot=469278957568 Virtual memory (bytes) snapshot=7312008220672 Total committed heap usage (bytes)=1954921054208 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=261 File Output Format Counters Bytes Written=300 |
执行完之后,我们发现 output 如下:
1 2 3 4 5 6 7 8 |
hadoop fs -ls /tmp/op_test/xinyu/wordcount/output Found 1301 items -rw-rw----+ 3 hdfs supergroup 0 2021-01-01 23:33 /tmp/op_test/xinyu/wordcount/output/_SUCCESS -rw-rw----+ 3 hdfs supergroup 0 2021-01-01 23:32 /tmp/op_test/xinyu/wordcount/output/part-r-00000 -rw-rw----+ 3 hdfs supergroup 0 2021-01-01 23:32 /tmp/op_test/xinyu/wordcount/output/part-r-00001 -rw-rw----+ 3 hdfs supergroup 0 2021-01-01 23:32 /tmp/op_test/xinyu/wordcount/output/part-r-00002 省略 ....... |
额,这里竟然生成 1301个 文件,这不是典型的小文件问题吗 ? 这里记一笔,容后再看为啥会这样。
我们先看结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
hadoop fs -cat /tmp/op_test/xinyu/wordcount/output/* mellow. 1 A 1 an 1 at 1 forgets 1 the 1 yellow. 1 anything 1 gets 1 he 1 bad, 1 and 2 is 3 elephant 1 An 2 mad, 1 his 1 yellow 1 Hadoop 2 data, 1 or 1 King! 1 extraneous 1 elegant 2 core, 1 He 2 Useful 1 does 1 fellow. 1 Elephant 1 Because, 1 Or 1 never 2 lets 1 cling! 1 element 1 gentle 1 thing. 1 |
如上,他确实完成了词频统计,但是这个 java 代码不是很好理解,不接地气。
趁此机会,刚好要学习 java ,就摸一下从 本地 eclipse 开发环境编写wordcount 项目,并上传至 Kerberos 的 CDH 环境运行。
3.3. 实践探索,自己实现 wordcount
首先,我们来创建项目目录结构。 一个使用Maven管理的普通的Java项目,它的目录结构默认如下:
1 2 3 4 5 6 7 8 9 10 |
a-maven-project ├── pom.xml ├── src │ ├── main │ │ ├── java │ │ └── resources │ └── test │ ├── java │ └── resources └── target |
项目的根目录 a-maven-project 是项目名,它有一个项目描述文件pom.xml,存放Java源码的目录是src/main/java,存放资源文件的目录是src/main/resources,存放测试源码的目录是src/test/java,存放测试资源的目录是src/test/resources,最后,所有编译、打包生成的文件都放在target目录里。这些就是一个Maven项目的标准目录结构。
所有的目录结构都是约定好的标准结构。使用标准结构不需要做任何配置,Maven就可以正常使用。
如上,我们创建好目录排版后,创建了三个 代码文件,一个map函数类、一个reduce函数类、一个运行代码类。
WordCountMapper类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
package com.cloudera.mr; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * package: com.cloudera.mr * describe: wordcount * create_user: xinyu * email: 382271477@qq.com */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> { /* * 1、获取文件的每行内容 * 2、将这行内容切分,调用 StringUtils 的方法是 split 方法,分隔符为 “”, 切分后的字符放到字符串内 * 3、遍历输出 <word, 1> */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取一行内容 String line = value.toString(); // 切分内容到单词数组 String[] words = StringUtils.split(line, " "); // 遍历,输出 for (String word:words) { context.write(new Text(word), new LongWritable(1)); } } } |
如上,我们自定义了 名为 WordCountMapper 的类,继承自 Mapper 类。 这个 Mapper 类是一个抽象类(什么是抽象类?),它有四个形参类型,分别指定 map 函数的输入键、输入值、输出键、输出值的类型: <LongWritable, Text, Text, LongWritable>
Hadoop 本身提供了一套可优化网络序列化传输的基本类型,而不直接使用 Java 内嵌的类型。 这些类型都在 org.apache.hadoop.io 包中。 这里使用 LongWritable 类型(相当于java的long长整型)、Text类型(相当于java的String类型)和 IntWritable 类型(相当于 java 的Integer类型)。
map() 方法的输入是一个键一个值。 我们在代码中先将 一行输入的 Text 转换为 Java 的String 类型,然后用 split 切割放入数组。 map() 函数还提供 Context 实例用于输出内容的写入。我们将 word 和 1 搭配写入 context 。
WordCountReducer类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package com.cloudera.mr; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * package: com.cloudera.mr * describe: wordcount * create_user: xinyu * email: 382271477@qq.com */ public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException { //定义一个累加计数器 定义为Long类型 long count = 0; for(LongWritable value:values){ //调用value的get()方法将long值取出来 count += value.get(); } //输出<单词:count>键值对 context.write(key, new LongWritable(count)); } } |
同样,我们自定义了 WordCountReducer类,继承自 Reduce类,它同样四个形参。reduce 函数的输入类型必须匹配map函数的输出类型。
看到这里可能会有疑问,代码中只是将 count 进行累加了,并没有判断 key 是否相同,不是应该将 相同key 值的count 进行累加吗 ?
其实这里我们少了一个 shuffle 环节,即清洗。 shuffle 环节处于 map 之后, reduce 之前。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# 假如我们需要统计内容如下 good better best never it rest till good is better and better is best # map 执行完后应该是这样的: good 1 better 1 best 1 never 1 it 1 rest 1 till 1 good 1 is 1 better 1 and 1 better 1 is 1 best 1 # 而 shuffle阶段进行清洗:使用类似 hash 的方式,一个key,放在hash表里面,就会产生一个code(java 里面的数据结构是 hashcode),然后再给它取余数。比如机器有四个节点,做reduce,那么就取余4,这样计算的任务就分给四台机器。这个就是shuffl机制。 (good, [1, 1]) (better, [1,1,1]) (is, [1,1]) ..... # 经过 shuffle 机制以后,到了 reduce 时,它本身需要处理的 key 值就都是一样的了,它只需要做 累加操作即可。然后输出: (good, 2) (better,3) (is, 2) ..... |
WordCount 类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
package com.cloudera.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * package: com.cloudera.mr * describe: wordcount * create_user: xinyu * email: 382271477@qq.com */ public class WordCount { private static Logger logger = LoggerFactory.getLogger(WordCount.class); public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: WordCount <input path> <output path>"); System.exit(-1); } logger.info(args[0] + "-----" + args[1]); Configuration conf = new Configuration(); conf.set("mapred.job.queue.name", "root.op_center.op_manager"); Job job = Job.getInstance(conf); job.setJarByClass(WordCount.class); job.setJobName("MyWordCount"); FileInputFormat.addInputPath(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
如上,Job 对象指定作业执行规范。 我们可以用它来控制整个作业的运行。我们在代码中构造 Job 对象之后,需要指定输入和输出数据的路径。 调用 FileInputFormat 类的静态方法 addInputPath() 来定义输入数据的路径,这个路径可以是单个文件、一个目录(将目录下的所有文件当作输入)。
调用 FileOutputFormat 类中的静态方法,setOutputPath() 来指定输出路径。 这个方法指定的是 reduce 函数输出文件的写入目录。在运行作业前,该目录是不应该存在的。
通过 setMapperClass() 和 setReducerClass() 方法指定要用的 map 类方法类型和 reduce 类方法类型。
setOutputKeyClass() 和 setOutputvalueClass() 方法控制 reduce 函数的输出类型,并且必须和 reduce 类产生的相匹配。 map 函数的输出类型默认和 reduce 是相同的,因此没有额外专门指定。但是如果不同,则需要用 setMapOutputKeyClass() 和 setMapOutputVlaueClass() 来指定。
最后调用 job 中的 waitForCompletion() 方法来提交作业并等待执行完成。该方法返回 bool 值,表示 true 成功 或者 flase 失败。
在 Hadoop 集群上运行整个作业时,要把代码打包成一个 Jar 包。
我们知道 Maven 是通过 Pom 文件来识别并编译管理相关依赖的,我们的程序依赖 hadoop-common 和 hadoop-client 等包,可以在这里指定,我并不需要额外去下载这些包,指定了 https://repository.cloudera.com/artifactory/cloudera-repos 仓库,它会自动帮我去找相关的依赖包。
pom.xml :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cloudera</groupId> <artifactId>WordCount</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.encoding>UTF-8</maven.compiler.encoding> <java.version>1.8</java.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <name>WordCount</name> <url>http://maven.apache.org</url> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> <name>Cloudera Repositories</name> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-cdh5.15.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.15.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> </project> |
写好pom文件后,我们就可以开始准备生成 jar 包了: 项目右键选择 debug as,然后进入如下操作:
Goals 填写: clean compile package
完成后将自动生成 target 目录:
我们看到 target 目录下已经生成我们想要的 WordCount-1.0-SNAPSHOT.jar 包了,我们将这个包传到客户端服务器。
jar 包需要放在本地客户端服务器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
hadoop jar software/WordCount-1.0-SNAPSHOT.jar com.cloudera.mr.WordCount -Dmapred.job.queue.name=root.op_center.op_manager /tmp/op_test/xinyu/wordcount/input /tmp/op_test/xinyu/wordcount/output 省略 .... 21/01/03 11:29:16 INFO mapreduce.Job: map 100% reduce 99% 21/01/03 11:29:17 INFO mapreduce.Job: map 100% reduce 100% 在这里等待了好久,从下面也可以看到 reduce 阶段持续了好长时间。。 21/01/03 11:41:47 INFO mapreduce.Job: Job job_1602495843334_9212309 completed successfully 21/01/03 11:41:47 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=26627 FILE: Number of bytes written=204928600 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=521 HDFS: Number of bytes written=300 HDFS: Number of read operations=3906 HDFS: Number of large read operations=0 HDFS: Number of write operations=2600 Job Counters Launched map tasks=2 Launched reduce tasks=1300 Other local map tasks=2 Total time spent by all maps in occupied slots (ms)=39416 Total time spent by all reduces in occupied slots (ms)=30941896 Total time spent by all map tasks (ms)=9854 Total time spent by all reduce tasks (ms)=7735474 Total vcore-milliseconds taken by all map tasks=9854 Total vcore-milliseconds taken by all reduce tasks=7735474 Total megabyte-milliseconds taken by all map tasks=40361984 Total megabyte-milliseconds taken by all reduce tasks=31684501504 Map-Reduce Framework Map input records=10 Map output records=46 Map output bytes=629 Map output materialized bytes=42315 Input split bytes=260 Combine input records=0 Combine output records=0 Reduce input groups=38 Reduce shuffle bytes=42315 Reduce input records=46 Reduce output records=38 Spilled Records=92 Shuffled Maps =2600 Failed Shuffles=0 Merged Map outputs=2600 GC time elapsed (ms)=117136 CPU time spent (ms)=1256020 Physical memory (bytes) snapshot=474618363904 Virtual memory (bytes) snapshot=7335716843520 Total committed heap usage (bytes)=1921904017408 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=261 File Output Format Counters Bytes Written=300 |
从上面的结果信息中,我们可以看到一些有价值的东西:
这个作业的标识 是 job_1602495843334_9212309,执行了 2个 map 和 1300 个 reduce 任务。Counters 部分显示了一些统计信息,我们可以看到:
10 个 map 输入记录(两个文件一共有10行),产生 46 个 map 输出记录。随后,这 46 个记录,分成 38 组 reduce 输入记录,产生 38 个reduce 输出记录。
我们看到输出目录下1300 个 part-r-xxxx 文件大部分都是空的(1300 个reduce),大小为0,而不为空的刚好有 38-1 个:
1 2 3 |
hadoop fs -ls /tmp/op_test/xinyu/wordcount/output | awk '{if ($5 > 0 ) {print $5}}' | wc -l 37 |
当运行完成后,可以看到结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
[root@aly-bigdata-hadoop-op-manager01 ~]# hadoop fs -cat /tmp/op_test/xinyu/wordcount/output/* mellow. 1 A 1 an 1 at 1 forgets 1 the 1 yellow. 1 anything 1 gets 1 he 1 bad, 1 and 2 is 3 elephant 1 An 2 mad, 1 his 1 yellow 1 Hadoop 2 data, 1 or 1 King! 1 extraneous 1 elegant 2 core, 1 He 2 Useful 1 does 1 fellow. 1 Elephant 1 Because, 1 Or 1 never 2 lets 1 cling! 1 element 1 gentle 1 thing. 1 |
3.3 概念扩展 与 加深理解
通过上面的实践,我们已经和可以自己写一个最简单的 Maven 项目,并传到集群,去执行获取结果。 但是我们对集群上任务本身的运行原理还不够了解,是时候停下来扩展一下概念,加深理解。
3.3.1 MR 的任务数据流
我们提交一个 MR 作业(Job 或者 Application) 到集群上之后,Hadoop 将作业分成了 两类 任务 Map 和 Reduce。 这些任务运行在集群的节点上,通过 Yarn 去调度。 如果一个任务失败,它将在另一个节点上自动重新调度运行。
在这个过程中,Hadoop 将 MR 的输入数据划分成 等长的小数据块,称为输入分片(input split)。Hadoop 为每个分片构建一个 map 任务,并由该任务来运行用户自定义的 map 函数从而处理分片中的每条记录。
那么可想而知,理论上分片越多,map 越多,并发越高就会更快。 但是另一方面,如果分片切的越小,那么管理分片的总时间和构建map的任务的总时间将决定做作业的整个执行时间。
- 那么分片取多少最合适呢? 对于大多数作业来说,合理的分片大小趋向于 HDFS 的一个块的大小,默认为 128MB。
我们知道,Hadoop 在存储有输入数据(HDFS中的数据)的本节点上运行 map 任务,可以获得最佳性能,因为它无需使用宝贵的集群带宽资源。 这就是所谓的 “数据本地化优化”。 在实际的集群运行过程中,有时候对一个map任务的 输入分片来说,存储该分片的 HDFS Block 的所有Daatanode 节点可能刚好被正在运行的其他任务占满了。 所以这时候就要从某一个数据块所在的机架中的一个节点上寻找一个空闲的 map 槽来运行该map任务(甚至在某些非常特殊的情况下,会使用其他机架的节点来运行该map任务,这将导致机架之间的网络传输)。
假如当一个 分片跨越两个数据块,那么对于任何一个 HDFS 节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到map任务运行的节点。 与本 节点运行相比,这种方法显然效率更低。
- map 任务的输出写入本地磁盘,而不是 HDFS,为什么呢 ?
因为map任务的输出是中间结果,该中间结果由 reduce 任务处理后才产生最终输出结果,而且一旦作业完成,map 任务的输出结果就可以删除。因此,如果把它存储在 HDFS中并且实现三副本,难免有些小题大做。 如果运行 map 任务的节点在将 map中间结果传送到 reduce 之前失败,Hadoop 将会在另一个节点上重新运行这个map任务以再次构建map 中间结果。
- reduce 任务的输出写入 HDFS,又是为什么呢 ?
reduce 任务并不具备数据本地化的优势,单个reduce 任务的输入通常来自于多个 map 任务的输出。 因此,排过序的map输出需要通过网络传输发送到运行 reduce 任务的节点。 数据在reduce 端合并,然后由用户定义的 reduce 函数处理。
对于 reduce 输出的每个 HDFS 块,第一个副本存储在本地节点上,其他副本处于可靠性考虑存储在其他机架的节点中。
因此,将reduce 的输出写入hdfs确实需要占据一定的集群带宽,但这与正常的 hdfs 管线写入的消耗一样,可以接受。
- map 任务的数量往往是由输入数据的大小 输入分片决定的,那 reduce 的数量呢 ?
reduce 任务的数量并非由输入数据的大小决定,是可以独立指定的。
如果有好多个 reduce 任务,每个map 任务就会针对输出进行分区(partition),为每个 reduce 任务建一个分区。 每个分区有许多键值,但相同键对应的键值对记录都在同一分区中。
分区可由用户定义的分区函数控制,但通常用默认的 parttitioner 通过哈希函数来分区,很高效。
多个map,一个reduce 图示(图片摘自网上):
多个map,多个reduce 图示(图片摘自网上):
如上,从这里我们可以更好的理解 shuffle 的作用,顾名思义,洗牌 就是 合并归类的作用。
当然,当数据处理可以完全并行,不需要 shuffle时,可能会出现 无 reduce 任务的情况。 比如 我们用 distcp 进行数据迁移拷贝时。
3.4 用 Python 实现 WordCount
3.4.1 Hadoop Streaming
尽管 Hadoop 框架是用Java编写的,但是跑 Hadoop 的程序不一定用Java,也可以用其他语言开发,比如 Python 或 c++等。 Hadoop 提供了 MapReduce 的 API,允许你使用 非 Java 的其他语言来写自己的map 和 reduce 函数。
Hadoop Streaming 使用 Unix 标准流作为 Hadoop 和应用程序之间的接口,所以我们可以使用任何编程语言通过标准输入/输出 来写 MapReduce 程序。
Streaming 天生适合用于文本处理。map 的输入数据通过标准输入流传递给 map 函数,并且是 一行一行的传输,最后将结果行写到标准输出。
map 输出的键值对是以制表符分割的行,reduce 函数的输入格式与之相同并通过标准输入流进行传输。 reduce 函数从标准输入流中读取输入行,该输入已由 Hadoop 框架根据键值排过序,最后将结果写入标准输出。
3.4.2 学习 官网的Python 实现 WordCount
我们先来看一下官网中的 Python跑MR案例:
如上,按照这个文档说明的话,貌似必须使用Jython将Python代码转换成Java jar文件。显然,这不是很方便,如果我们用Jython没有提供的Python特性,甚至可能会产生问题。 Hadoop 自带的Python Wordcount 目录默认在:
1 2 3 4 5 |
$HADOOP_HOME/src/examples/python/WordCount.py # CDH 默认在(路径前缀根据安装配置进行替换) /software/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/share/doc/hadoop-0.20-mapreduce/examples/src/python/WordCount.py |
但这个代码感觉不是很友好,就是一点都不 Python:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
from org.apache.hadoop.fs import Path from org.apache.hadoop.io import * from org.apache.hadoop.mapred import * import sys import getopt class WordCountMap(Mapper, MapReduceBase): one = IntWritable(1) def map(self, key, value, output, reporter): for w in value.toString().split(): output.collect(Text(w), self.one) class Summer(Reducer, MapReduceBase): def reduce(self, key, values, output, reporter): sum = 0 while values.hasNext(): sum += values.next().get() output.collect(key, IntWritable(sum)) def printUsage(code): print "wordcount [-m <maps>] [-r <reduces>] <input> <output>" sys.exit(code) def main(args): conf = JobConf(WordCountMap); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text); conf.setOutputValueClass(IntWritable); conf.setMapperClass(WordCountMap); conf.setCombinerClass(Summer); conf.setReducerClass(Summer); try: flags, other_args = getopt.getopt(args[1:], "m:r:") except getopt.GetoptError: printUsage(1) if len(other_args) != 2: printUsage(1) for f,v in flags: if f == "-m": conf.setNumMapTasks(int(v)) elif f == "-r": conf.setNumReduceTasks(int(v)) conf.setInputPath(Path(other_args[0])) conf.setOutputPath(Path(other_args[1])) JobClient.runJob(conf); if __name__ == "__main__": main(sys.argv) |
这里面采用了大量 org.apahce.hadoop 的包与类,乍一看不好理解,我们用自己的方式去实现一个:
3.4.3 自己构造 Python WordCount 程序
根据 3.4.1 我们知道 hadoop streaming 可以很好的处理并被调用,我们的程序目的与标准:
与传统的 wordcount 一样,我们为单词计数,即读取某个文本文件并计算其中单词出现的频率。输入是文本文件,输出是文本文件。 输出的内容是每行包含一个单词和它出现的次数,用制表符分隔。
我们先写一个 mapper 脚本,它将从STDIN读取数据,将其拆分为单词,并输出一个行列表,将单词与其计数映射到STDOUT。
但是,Map脚本不会计算单词出现次数的和。它将立即输出(word ,1)这样的元组。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
[root@aly-bigdata-hadoop-op-manager01 py_test]# cat mapper.py import sys for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() # increase counters for word in words: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print '%s\t%s' % (word, 1) |
然后,我们实现 reducer函数, 它将从STDIN读取mapper.py的结果,并将出现的每个单词和最终的计数相加,然后将其结果输出到STDOUT。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
[root@aly-bigdata-hadoop-op-manager01 py_test]# cat reducer.py #!/usr/bin/env python """reducer.py""" from operator import itemgetter import sys current_word = None current_count = 0 word = None # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to int try: count = int(count) except ValueError: # count was not a number, so silently # ignore/discard this line continue # 如果与上一行的 word 相同,那么count叠加,否则重新 赋予 current_word 值,所以这个前提是 相同的 word 的行是挨着来的。 if current_word == word: current_count += count else: if current_word: # write result to STDOUT print '%s\t%s' % (current_word, current_count) current_count = count current_word = word # do not forget to output the last word if needed! if current_word == word: print '%s\t%s' % (current_word, current_count) |
如上,需要注意的是,这个代码中,要求 reduce 的输入是按照 word 排序过的,即相同的 word 行是挨着进来的,否则会出错。
我们可以测试一下:
mapper 将一行输入映射成了 word 1 这样的元组。
1 2 3 4 5 6 7 8 9 10 |
[root@aly-bigdata-hadoop-op-manager01 py_test]# echo "wang wang xin yu hello tie niu wang nihao " | ./mapper.py wang 1 wang 1 xin 1 yu 1 hello 1 tie 1 niu 1 wang 1 nihao 1 |
如果我们直接接上 reduce 则会出错,如下,wang 这个单词被分开统计了:
1 2 3 4 5 6 7 8 9 |
[root@aly-bigdata-hadoop-op-manager01 py_test]# echo "wang wang xin yu hello tie niu wang nihao " | ./mapper.py | ./reducer.py wang 2 xin 1 yu 1 hello 1 tie 1 niu 1 wang 1 nihao 1 |
需要,在中间加一个 shuffle 环节,我们用 sort 来模拟:
1 2 3 4 5 6 7 8 |
[root@aly-bigdata-hadoop-op-manager01 py_test]# echo "wang wang xin yu hello tie niu wang nihao niu" | ./mapper.py | sort | ./reducer.py hello 1 nihao 1 niu 2 tie 1 wang 3 xin 1 yu 1 |
本次测试通过后,我们在 hadoop 集群上去测试运行我们的代码。
我们先搞一些测试数据来,从这三个地方分别下载三份测试数据:
- The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson
- The Notebooks of Leonardo Da Vinci
- Ulysses by James Joyce
1 2 3 |
wget http://www.gutenberg.org/ebooks/20417.txt.utf-8 wget http://www.gutenberg.org/files/5000/5000-8.txt wget http://www.gutenberg.org/files/4300/4300-0.txt |
将此三个文件移动到 hdfs 集群的目录下:
1 2 3 4 5 6 7 8 |
[root@aly-bigdata-hadoop-op-manager01 py_test]# hadoop dfs -copyFromLocal wc_input /tmp/op_test/xinyu/wordcount/py_input [root@aly-bigdata-hadoop-op-manager01 py_test]# hadoop fs -ls /tmp/op_test/xinyu/wordcount/py_input Found 3 items -rw-rw----+ 3 hdfs supergroup 674570 2021-01-03 17:45 /tmp/op_test/xinyu/wordcount/py_input/20417.txt.utf-8 -rw-rw----+ 3 hdfs supergroup 1586488 2021-01-03 17:45 /tmp/op_test/xinyu/wordcount/py_input/4300-0.txt -rw-rw----+ 3 hdfs supergroup 1428843 2021-01-03 17:45 /tmp/op_test/xinyu/wordcount/py_input/5000-8.txt |
然后我们利用 hadoop-streaming 包来执行:
1 2 3 4 5 6 |
hadoop jar /software/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/jars/hadoop-streaming-2.6.0-cdh5.15.1.jar \ -Dmapred.job.queue.name=root.op_center.op_manager \ -file ../py_test/mapper.py -mapper ../py_test/mapper.py \ -file ../py_test/reducer.py -reducer ../py_test/reducer.py \ -input /tmp/op_test/xinyu/wordcount/py_input \ -output /tmp/op_test/xinyu/wordcount/py_output |
输出结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
省略部分 .... 21/01/03 17:53:43 INFO mapreduce.Job: map 100% reduce 97% 21/01/03 17:53:44 INFO mapreduce.Job: map 100% reduce 99% 21/01/03 17:53:45 INFO mapreduce.Job: map 100% reduce 100% 21/01/03 17:53:48 INFO mapreduce.Job: Job job_1602495843334_9229069 completed successfully 21/01/03 17:53:48 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=1256840 FILE: Number of bytes written=212723615 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=3690269 HDFS: Number of bytes written=887455 HDFS: Number of read operations=3909 HDFS: Number of large read operations=0 HDFS: Number of write operations=2600 Job Counters Launched map tasks=3 Launched reduce tasks=1300 Other local map tasks=3 Total time spent by all maps in occupied slots (ms)=69100 Total time spent by all reduces in occupied slots (ms)=21443664 Total time spent by all map tasks (ms)=17275 Total time spent by all reduce tasks (ms)=5360916 Total vcore-milliseconds taken by all map tasks=17275 Total vcore-milliseconds taken by all reduce tasks=5360916 Total megabyte-milliseconds taken by all map tasks=70758400 Total megabyte-milliseconds taken by all reduce tasks=21958311936 Map-Reduce Framework Map input records=78753 Map output records=630075 Map output bytes=4836706 Map output materialized bytes=1519350 Input split bytes=368 Combine input records=0 Combine output records=0 Reduce input groups=82555 Reduce shuffle bytes=1519350 Reduce input records=630075 Reduce output records=82555 Spilled Records=1260150 Shuffled Maps =3900 Failed Shuffles=0 Merged Map outputs=3900 GC time elapsed (ms)=164111 CPU time spent (ms)=1967940 Physical memory (bytes) snapshot=485633126400 Virtual memory (bytes) snapshot=7313137594368 Total committed heap usage (bytes)=1972727971840 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=3689901 File Output Format Counters Bytes Written=887455 21/01/03 17:53:48 INFO streaming.StreamJob: Output directory: /tmp/op_test/xinyu/wordcount/py_output # 我们随便查看其中的一个内容: [root@aly-bigdata-hadoop-op-manager01 py_test]# hadoop fs -cat /tmp/op_test/xinyu/wordcount/py_output/part-01299 "_Giostre 1 1551, 1 39 2 Boughs 1 Brigade 1 Cha 1 DEATH 4 Davitt, 2 Extraordinary, 1 Goes 1 I.H.S. 1 Innocent 1 Kinch? 1 Namine. 1 Napoli_. 1 O’Hara, 1 Prhklstr 1 Strike 2 Toil 1 Wednesday, 1 _(Triumphaliter.) 1 _Cresciere 1 bewildering 1 change, 5 conglomerated 2 consistence 1 dairyfed 1 diminuendo_) 1 drover’s 1 entry, 1 exorcism 1 fight. 4 fills 11 forbidding 4 gauges 1 geographer 1 gloves 15 hinc 1 is... 3 istas: 1 jellyfishes 1 kangaroo, 1 knives 4 oxengut 1 past: 1 pennies, 1 puffed, 1 pyramid 55 remark, 5 ribbon 5 sagging 1 sailorman, 1 sappers 1 scoop 1 scritto: 1 servitu 1 silken 6 simple. 3 slaving 1 sourly 1 swarthy 1 terrace, 2 touchy. 1 tramtracks. 1 transmigration 2 trust. 1 turtledove 1 vegetation 7 walnuts 3 waves, 22 —Upon 1 |
参考
Writing An Hadoop MapReduce Program In Python