Spark输出⾃定义⽂件⽬录踩坑(Java)
最近项⽬中,使⽤Spark做离线计算,结果需要输出⼀份结果到⽂件中保存,并且需要按Key来放置不同的⽬录。因为spark通过saveAsTextFile()⽅法默认输出是以part-0000的形式。
(想⾃学习编程的⼩伙伴请搜索,更多⾏业相关资讯更有⾏业相关免费视频教程。完全免费哦!)
解决⽅法
通过搜索,很轻易的就能搜索到使⽤saveAsHadoopFile()⽅法可以将⽂件输出到⾃定义⽂件⽬录。⽹上⼤部分都是scala的写法,java的具体操作如下:
//⾸先,构造出⼀个PariRDD形式的RDD
JavaPairRDD<String, JSONObject> javaPairRDD =xxx
//使⽤saveAsHadoopFile⽅法输出到⽬标⽬录下,⽅法参数分别为(⽬标⽬录,key的class类型,value的class类型,输出format类)
javaPairRDD.saveAsHadoopFile("D:\\Test",String.class,JSONObject.class,RDDMultipleTextOutputFormat2.class);
/
/⾃定义⼀个RDDMultipleTextOutputFormat2继承MultipleTextOutputFormat
public static class RDDMultipleTextOutputFormat2 extends MultipleTextOutputFormat<String, JSONObject> {
@Override
public String generateFileNameForKeyValue(String key, JSONObject value,
String name) {
String object_type = String("object_type");
String object_id = String("object_id");
return object_type + "/" + object_id+".json";
}
}
最后输出的结果就是按"D:\Test\object_type\object_id.json来区分保存。
新的问题
美滋滋的打开按我们要求输出⽬录的输出⽂件。结果却发现,输出⽂件中,并不是仅仅将value写⼊了⽂件中,同时把key也写了进去。但是我们的⽂件格式是不需要key写⼊⽂件的。
//输出⽂件内容形式如下
key  value
解决办法⼀
遇事不决百度Google,通过搜索引擎,可以到我们通过设置rdd的key为NullWritable,使得输出⽂件中不包含key,⽹上同样⼤多数是scala的,下⾯是java的具体操作:
/⾸先,构造出⼀个PariRDD形式的RDD
JavaPairRDD<String, JSONObject> javaPairRDD =xxx
//将PariRDD转为<NullWritable,T>的形式
JavaPairRDD<NullWritable, JSONObject> nullKeyJavaPairRDD = javaPairRDD.mapToPair(tuple2 ->
{
return new (),tuple2._2);
});
//接下来的操作和上⾯⼀样
nullKeyJavaPairRDD.saveAsHadoopFile("D:\\Test",NullWritable.class,JSONObject.class,RDDMultipleTextOutputFormat.class);
public static class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat<NullWritable, JSONObject> {
@Override
public String generateFileNameForKeyValue(NullWritable key, JSONObject value,
String name) {
String object_type = String("object_type");
String object_id = String("object_id");
return object_type + "/" + object_id+".json";
}
}
如上⽅法确实是可以解决问题,但是如果我们需要的输出⽬录与key有关系,想将key使⽤在⾃定义⽬录中,这就办不到了。所以这个解决⽅法还是有缺陷的,仅仅能满⾜输出⽬录只与value有关系的情况。
解决办法⼆
这⾥想到另⼀个解决思路,往⽂件写内容总是需要⼀个类的,我们到这个类,重写它,把key的输出去掉,不就可以了。
于是我们跟进我们继承的MultipleTextOutputFormat类中
public class MultipleTextOutputFormat<K, V>
extends MultipleOutputFormat<K, V> {
private TextOutputFormat<K, V> theTextOutputFormat = null;
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
String name, Progressable arg3) throws IOException {
if (theTextOutputFormat == null) {
theTextOutputFormat = new TextOutputFormat<K, V>();
}
RecordWriter(fs, job, name, arg3);
}
}
并没有发现有相关的⽅法,我们继续跟进⽗类MultipleOutputFormat,在这个类中,我们发现了⼀个write⽅法:
public void write(K key, V value) throws IOException {
// get the file name based on the key
String keyBasedPath = generateFileNameForKeyValue(key, value, myName);
// get the file name based on the input file name
String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath);
// get the actual key
K actualKey = generateActualKey(key, value);
V actualValue = generateActualValue(key, value);
RecordWriter<K, V> rw = (finalPath);
if (rw == null) {
// if we don't have the record writer yet for the final path, create
/
/ one
// and add it to the cache
rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
}
getsavefilename
rw.write(actualKey, actualValue);
};
感觉真相就在眼前了,我们继续跟进rw.write(actualKey, actualValue);⽅法,通过断点我们可以知道他进⼊的是TextOutPutFormat #write()⽅法:
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
这段代码就很简单了,只要key不是null或者NullWritable类,他就会往⽂件⾥输出。这也解释了为什么上⾯⽅法⼀中将key转换为NullWritable类就不会输出到⽂件中了。
了解到这⾥,我们就很容易得出解决⽅案,我们只要将传⼊write()⽅法中的key传⼊null就可以了。回到MultipleOutputFormat类中,我们看到传⼊的key是由这个⽅法获取的K actualKey = generateActualKey(key, value);,继续跟进:
protected K generateActualKey(K key, V value) {
return key;
}
接下很简单了,我们在⾃定义的format类中重写这个⽅法,改为返回null即可。
public static class RDDMultipleTextOutputFormat3 extends MultipleTextOutputFormat<String, JSONObject> {
@Override
public String generateFileNameForKeyValue(String key, JSONObject value,
String name) {
String object_id = String("object_id");
return key + "/" + object_id+".json";
}
@Override
public String generateActualKey(String key, JSONObject value) {
return null;
}
}
总结
其实这次踩坑还是挺简单的,按部就班的⼀路跟进就到了,其中还有不少点是可以延伸的,⽐如saveAsHadoopFile⽅法的的底层实现,和传统的saveAsTextFile⽅法的异同,我在查资料的过程中也发现了很多延伸的知识点,不过这些应该就是另⼀篇博客了。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。