Hadoop实现词频统计(按照词频降序排列以及相同词频的单词按照字母序排
列)
Hadoop实现词频统计(按照词频降序排列以及相同词频的单词按照字母序排列)
⼀.环境
ubuntu虚拟机,使⽤的是伪分布式的hadoop集(对于做实验使⽤伪分布式的更⽅便),代码通过eclipse来提交
replaceall()
⼆.实现步骤
⼀共使⽤了两个MapReduce,第⼀个MapReduce实现词频统计,第⼆个MapReduce实现排序
1.数据
2.主函数
其中将停词表的路径作为全局参数传⼊第⼀个MapReduce的配置⽂件中。还设置了词频的阈值,通过args传⼊main函数,⾃⼰运⾏程序的时候设置。
两个MapReduce顺序执⾏,第⼀个的输出作为第⼆个的输⼊,因为第⼆个MapReduce依赖第⼀个所以要设置依赖
public static void main(String[] args )throws Exception
{
Configuration conf1 =new Configuration(true);
// 停词表所在的路径
conf1.setStrings("stopwords","hdfs://localhost:9000/");
// 设置词频阈值,⼩于该阈值的不输出
conf1.set("num", args[0]);
// 输⼊⽂件输出⽂件的路径
String[] ars=new String[]{"hdfs://localhost:9000/stopword/data","hdfs://localhost:9000/stopword/output/output1","hdfs://localhost:9000/stopword/output/out put2"};
String[] otherArgs=new GenericOptionsParser(conf1,ars).getRemainingArgs();
// job1,词频统计
Job job1= Instance(conf1,"world count");
job1.setJarByClass(WordSort.class);
job1.setMapperClass(Map.class);
job1.setReducerClass(Reduce.class);
job1.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job1,new Path(otherArgs[0]));
//  job.setOutputFormatClass(TextOutputFormat.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(job1,new Path(otherArgs[1]));
// 将job1加⼊控制器
ControlledJob ctrlJob1 =new ControlledJob(conf1);
ctrlJob1.setJob(job1);
// job2,将词频按照降序排列,并且相同词频的单词按照字母序排列
Configuration conf2 =new Configuration(true);
Job job2= Instance(conf2,"sort");
job2.setJarByClass(WordSort.class);
job2.setMapperClass(Map2.class);
job2.setReducerClass(Reduce2.class);
job2.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job2,new Path(otherArgs[1]));
job2.setOutputKeyClass(IntWritable.class);
job2.setOutputValueClass(Text.class);
// 设置对map输出排序的⾃定义类
job2.setSortComparatorClass(Sort.class);
FileOutputFormat.setOutputPath(job2,new Path(otherArgs[2]));
// 将job2加⼊控制器
ControlledJob ctrlJob2 =new ControlledJob(conf2);
ctrlJob2.setJob(job2);
//设置作业之间的依赖关系,job2的输⼊以来job1的输出
ctrlJob2.addDependingJob(ctrlJob1);
//设置主控制器,控制job1和job2两个作业
JobControl jobCtrl =new JobControl("myCtrl");
//添加到总的JobControl⾥,进⾏控制
jobCtrl.addJob(ctrlJob1);
jobCtrl.addJob(ctrlJob2);
System.out.println("Job Start!");
//在线程中启动,记住⼀定要有这个
Thread thread =new Thread(jobCtrl);
thread.start();
while(true){
if(jobCtrl.allFinished()){
System.out.SuccessfulJobList());
jobCtrl.stop();
break;
}
}
}
3.第⼀个MapReduce
Map
⾸先在setup中读取停词表(好处是只需要读⼀次,如果在map中会重复读,浪费资源),然后在map中使⽤正则表达式出去标点符号,因为这个表达式会留下单词和数字,所以再对数字进⾏清除,留下来的单词作为reduce的输出
// 第⼀个map
public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>
{
private Set<String> stopwords;
private String localFiles;
@Override
public void setup(Context context)throws IOException,InterruptedException{
stopwords =new TreeSet<String>();
// 获取在main函数中设置的conf配置⽂件
Configuration conf = Configuration();
// 获取停词表所在的hdfs路径
localFiles  = Strings("stopwords")[0];
FileSystem fs = (ate(localFiles), conf);
FSDataInputStream hdfsInStream = fs.open(new Path(localFiles));
// 从hdfs中读取
InputStreamReader isr =new InputStreamReader(hdfsInStream,"utf-8");
String line;
BufferedReader br =new BufferedReader(isr);
while((line = br.readLine())!= null){
StringTokenizer itr =new StringTokenizer(line);
while(itr.hasMoreTokens()){
// 得到停词表
stopwords.Token());
}
}
}
// ⽤来判断字符串是否为数字
Pattern pattern = Patternpile("^[-\\+]?[\\d]*$");
@Override
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException
{
FileSplit fileSplit =(InputSplit();
String temp =new String();
final IntWritable one =new IntWritable(1);
// 使⽤正则表达式除去标点符号
StringTokenizer itr =new String().toLowerCase().replaceAll("\\pP|\\pS",""));
//  String[] itr = String().toLowerCase().split("[^a-zA-Z']+");
for(;itr.hasMoreTokens();){
temp = Token();
// 如果是数字则不保存
if(pattern.matcher(temp).matches()){
continue;
}
// 判断单词是否在停词表中,如果不在则保存
if(!ains(temp)){
Text word =new Text();
word.set(temp);
context.write(word, one);
}
}
}
}
Reduce
⾸先需要在setup中读取设置的阈值,因为相同单词发送到⼀个reduce上,所以对其频数求和得到总频数,并将其与阈值做⽐较,⼤于的才进⾏输出
// 第⼀个reduce
public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>
{
String num;
public void setup(Context context)throws IOException,InterruptedException{
Configuration conf = Configuration();
// 获取词频阈值
num = ("num");
}
IntWritable result =new IntWritable();
@Override
public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException
{
int sum =0;
// 统计词频
for(IntWritable val : values){
sum += ();
}
if(sum > Integer.parseInt(num)){
result.set(sum);
context.write(key,result);
}
}
}
4.第⼆个MapReduce
Map
⾸先读取第⼀个MapReduce的输出,得到单词和频数,因为想要对频数排序,再因为map传到reduce会经过sort这个过程,所以可以利⽤这个过程对频数进⾏排序,只需要将词频作为键,单词作为值从map函数中输出
// 第⼆个map
public static class Map2 extends Mapper<LongWritable,Text,IntWritable,Text>
{
private Set<String> stopwords;
private Path[] localFiles;
@Override
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException
{
// 读取第⼀个mapreduce的结果,通过制表符将键和值分开
String[] data = String().split("\t");
// 将词频作为键,单词作为值
context.write(new IntWritable(Integer.parseInt(data[1])),new Text(data[0]));
}
}
Reduce
在map中已经实现了频数的排序,现在需要实现相同词频的单词按照字母序排列,因为相同词频的单词被发送到⼀个reduce上,所以对reduce输⼊的值按字母序排列,然后按照排列好的顺序依次写⼊(单词作为键,词频作为值),即可实现相同词频下按照字母序排列

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。