wordcount代码实现详解
阅读⽬录
Hadoop的框架最核⼼的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算。 HDFS是Google File System(GFS)的开源实现,MapReduce是Google MapReduce的开源实现。
HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算。
本⽂主要参考了以下三篇博客学习整理⽽成。
1、
2、
3、
1、MapReduce整体流程
最简单的MapReduce应⽤程序⾄少包含 3 个部分:⼀个 Map 函数、⼀个 Reduce 函数和⼀个 main 函数。
在运⾏⼀个mapreduce计算任务时候,任务过程被分为两个阶段:map阶段和reduce阶段,每个阶段都是⽤键值对(key/value)作为输⼊(input)和输出(output)。main 函数将作业控制和⽂件输⼊/输出结合起来。
并⾏读取⽂本中的内容,然后进⾏MapReduce操作。
Map过程:并⾏读取⽂本,对读取的单词进⾏map操作,每个词都以<key,value>形式⽣成。
我的理解:
⼀个有三⾏⽂本的⽂件进⾏MapReduce操作。
读取第⼀⾏Hello World Bye World ,分割单词形成Map。
<Hello,1> <World,1> <Bye,1> <World,1>
读取第⼆⾏Hello Hadoop Bye Hadoop ,分割单词形成Map。
<Hello,1> <Hadoop,1> <Bye,1> <Hadoop,1>
读取第三⾏Bye Hadoop Hello Hadoop,分割单词形成Map。
<Bye,1> <Hadoop,1> <Hello,1> <Hadoop,1>
Reduce操作是对map的结果进⾏排序,合并,最后得出词频。
我的理解:
经过进⼀步处理(combiner),将形成的Map根据相同的key组合成value数组。
<Bye,1,1,1> <Hadoop,1,1,1,1> <Hello,1,1,1> <World,1,1>
循环执⾏Reduce(K,V[]),分别统计每个单词出现的次数。
<Bye,3> <Hadoop,4> <Hello,3> <World,2>
2、WordCount源码
package org.amples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.f.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;字符串函数应用详解
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
*
* 描述:WordCount explains by York
* @author Hadoop Dev Group
*/
publicclass WordCount {
/**
* 建⽴Mapper类TokenizerMapper继承⾃泛型类Mapper
* Mapper类:实现了Map功能基类
* Mapper接⼝:
* WritableComparable接⼝:实现WritableComparable的类可以相互⽐较。所有被⽤作key的类应该实现此接⼝。
* Reporter 则可⽤于报告整个应⽤的运⾏进度,本例中未使⽤。
*
*/
publicstaticclass TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
/**
* IntWritable, Text 均是 Hadoop 中实现的⽤于封装 Java 数据类型的类,这些类实现了WritableComparable接⼝,
* 都能够被串⾏化从⽽便于在分布式环境中进⾏数据交换,你可以将它们分别视为int,String 的替代品。
* 声明one常量和word⽤于存放单词的变量
*/
privatefinalstatic IntWritable one =new IntWritable(1);
private Text word =new Text();
/**
* Mapper中的map⽅法:
* void map(K1 key, V1 value, Context context)
* 映射⼀个单个的输⼊k/v对到⼀个中间的k/v对
* 输出对不需要和输⼊对是相同的类型,输⼊对可以映射到0个或多个输出对。
* Context:收集Mapper输出的<k,v>对。
* Context的write(k, v)⽅法:增加⼀个(k,v)对到context
* 程序员主要编写Map和Reduce函数.这个Map函数使⽤StringTokenizer函数对字符串进⾏分隔,通过write⽅法把单词存⼊word中 * write⽅法存⼊(单词,1)这样的⼆元组到context中
*/
publicvoid map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr =new String());
while (itr.hasMoreTokens()) {
word.Token());
context.write(word, one);
}
}
}
publicstaticclass IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result =new IntWritable();
/**
* Reducer类中的reduce⽅法:
* void reduce(Text key, Iterable<IntWritable> values, Context context)
* 中k/v来⾃于map函数中的context,可能经过了进⼀步处理(combiner),同样通过context输出
*/
publicvoid reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum =0;
for (IntWritable val : values) {
sum += ();
}
result.set(sum);
context.write(key, result);
}
}
publicstaticvoid main(String[] args) throws Exception {
/**
* Configuration:map/reduce的j配置类,向hadoop框架描述map-reduce执⾏的⼯作
*/
Configuration conf =new Configuration();
String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length !=2) {
}
Job job =new Job(conf, "word count"); //设置⼀个⽤户定义的job名称
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class); //为job设置Mapper类
job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类
job.setReducerClass(IntSumReducer.class); //为job设置Reducer类
job.setOutputKeyClass(Text.class); //为job的输出数据设置Key类
job.setOutputValueClass(IntWritable.class); //为job输出设置value类
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为job设置输⼊路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为job设置输出路径
}
}
3、WordCount逐⾏解析
对于map函数的⽅法。
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…}
这⾥有三个参数,前⾯两个Object key, Text value就是输⼊的key和value,第三个参数Context context这是可以记录输⼊的key和value,例如:context.write(word, one);此外context还会记录map运算的状态。
对于reduce函数的⽅法。
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…}
reduce函数的输⼊也是⼀个key/value的形式,不过它的value是⼀个迭代器的形式Iterable<IntWritable> values,也就是说reduce的输⼊是⼀个key对应⼀组的值的value,reduce也有context和map的context作⽤⼀致。
⾄于计算的逻辑则需要程序员编码实现。
对于main函数的调⽤。
⾸先是:
Configuration conf = new Configuration();
运⾏MapReduce程序前都要初始化Configuration,该类主要是读取MapReduce系统配置信息,这些信息包括hdfs还有MapReduce,也就是安装hadoop时候的配置⽂件例如:l、l和l等等⽂件⾥的信息,有些童鞋不理解为啥要这么做,这个是没有深⼊思考MapReduce计算框架造成,我们程序员开发MapReduce时候只是在填空,在map函数和reduce函数⾥编写实际进⾏的业务逻辑,其它的⼯作都是交给MapReduce框架⾃⼰操作的,但是⾄少我们要告诉它怎么操作啊,⽐
如hdfs在哪⾥,MapReduce的jobstracker在哪⾥,⽽这些信息就在conf包下的配置⽂件⾥。
接下来的代码是:
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
}
If的语句好理解,就是运⾏WordCount程序时候⼀定是两个参数,如果不是就会报错退出。⾄于第⼀句⾥的GenericOptionsParser类,它是⽤来解释常⽤hadoop命令,并根据需要为Configuration对象设置相应的值,其实平时开发⾥我们不太常⽤它,⽽是让类实现Tool接⼝,然后再main函数⾥使⽤ToolRunner运⾏程序,⽽ToolRunner内部会调⽤GenericOptionsParser。
接下来的代码是:
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
第⼀⾏就是在构建⼀个job,在mapreduce框架⾥⼀个mapreduce任务也叫mapreduce作业也叫做⼀个mapreduce的job,⽽具体的map 和reduce运算就是task了,这⾥我们构建⼀个job,构建时候有两个参数,⼀个是conf这个就不累述了,⼀个是这个job的名称。
第⼆⾏就是装载程序员编写好的计算程序,例如我们的程序类名就是WordCount了。这⾥我要做下纠正,虽然我们编写mapreduce程序只需要实现map函数和reduce函数,但是实际开发我们要实现三个类,第三个类是为了配置mapreduce如何运⾏map和reduce函数,准确的说就是构建⼀个mapreduce能执⾏的job了,例如WordCount类。
第三⾏和第五⾏就是装载map函数和reduce函数实现类了,这⾥多了个第四⾏,这个是装载Combiner
类,这个类和mapreduce运⾏机制有关,其实本例去掉第四⾏也没有关系,但是使⽤了第四⾏理论上运⾏效率会更好。
接下来的代码:
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
这个是定义输出的key/value的类型,也就是最终存储在hdfs上结果⽂件的key/value的类型。
最后的代码是:
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
第⼀⾏就是构建输⼊的数据⽂件,第⼆⾏是构建输出的数据⽂件,最后⼀⾏如果job运⾏成功了,我们的程序就会正常退出。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论