【Hadoop】⼩⽂件合并⽅案⼀
HDFS中数据块⼤⼩⼀般在64M或者128M,也就是说每⼀次分配的数据块就是这么⼤,我们所谓的⼩⽂件就是⽐⽂件系统中定义的数据块还⼩的⽂件,⼩⽂件经常会导致namenode内存消耗和降低hadoop读取速度,影响整体的性能。所以遇到这种情况的时候,我们需要定期对⼩⽂件进⾏清理或者合并。
⼩⽂件合并的⽅案有多种,我这⾥通过将数据down到本地⽂件系统中再进⾏合并上传达到⽬的。这种⽅法会占⽤磁盘空间,当然最好的⽅法是使⽤MapReduce的⽅式将数据打包成SequenceFile进⾏处理,MapReduce的特性能够使合并过程效率更⾼
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
SimpleDateFormat;
import java.util.Calendar;
import Pattern;
import org.f.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
//参数1为本地⽬录,参数2为HDFS上的⽂件
public class FileMerge {
public static void fileMergeFunc(String srcDir, String targetDir, String localDir, String targetName, String regStr) throws IOException
{
Configuration conf = new Configuration();
Path localPath = new Path(localDir);
Path srcPath = new Path(srcDir);
//整合后的⽬录
Path targetPathFile  = new Path(targetDir + "/" + targetName);
//fs是HDFS⽂件系统
FileSystem hadoopFs = FileSystem(conf);
//本地⽂件系统
FileSystem localFs = Local(conf);
if (ists(localPath)) {
System.out.println("Deleting ");
localFs.delete(localPath, true);
}
localFs.mkdirs(localPath);
FileStatus[] HdfsStatus =  hadoopFs.listStatus(srcPath);
for(FileStatus st: HdfsStatus)
{
Path tmpPath = st.getPath();
if (Pattern.matches(srcDir+regStr, String())) {
System.out.println("Coping hadoop files " + st.getPath() + " to ");
hadoopFs.delete(tmpPath, false);
}
}
FileStatus[] status =  localFs.listStatus(localPath);  //得到输⼊⽬录
FSDataOutputStream out = ate(targetPathFile); //在HDFS上创建输出⽂件
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, "UTF-8")); //创建写⼊流
StringBuffer sb = new StringBuffer();
String line;
System.out.println("");
for(FileStatus st: status)
for(FileStatus st: status)
{
Path temp = st.getPath();
FSDataInputStream in = localFs.open(temp);
BufferedReader bf = new BufferedReader(new InputStreamReader(in, "UTF-8"));
while((line = bf.readLine()) != null) {
if (line != "\n") {
sb.append(line);
}
}
//pyBytes(in, out, 4096, false);    //读取in流中的内容放⼊out
bf.close();
in.close(); //完成后,关闭当前⽂件输⼊流
}
bw.String());
out.close();
bw.close();
}
//srcDir, targetDir, localDir, targetName, regStr
public static void main(String [] args) throws IOException
{
if (args.length != 6){
System.out.println("⼊参格式不正确,请修正,请检查...");
} else {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyyMMdd");
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHH");
Calendar cal1 = Instance();
cal1.add(Calendar.HOUR, Integer.parseInt(args[5]));
Calendar cal2 = Instance();
cal2.add(Calendar.HOUR, Integer.parseInt(args[5]));
String dt1 = sdf1.Time());
String dt2 = sdf2.Time());
String srcDir = args[0];
String targetDir = args[1] + dt1;
mkdirs方法
String localDir = args[2];
String targetName = args[3] + "_" + dt2;
String regStr = args[4].replace("{DATE_TIME}", dt2);
System.out.println(srcDir);
System.out.println(targetDir);
System.out.println(localDir);
System.out.println(targetName);
System.out.println(regStr);
<span > </span>fileMergeFunc(srcDir, targetDir, localDir, targetName, regStr);    }
}
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。