⽤java编写spark程序,简单⽰例及运⾏
最近因为⼯作需要,研究了下spark,因为scala还不熟,所以先学习了java的spark程序写法,下⾯是我的简单测试程序的代码,⼤部分函
数的⽤法已在注释⾥⾯注明。
我的环境:hadoop 2.2.0
spark-0.9.0
scala-2.10.3
jdk1.7
[java]
1. import org.apache.spark.api.java.JavaPairRDD;
2. import org.apache.spark.api.java.JavaRDD;
3. import org.apache.spark.api.java.JavaSparkContext;
4. import org.apache.spark.api.java.function.FlatMapFunction;
5. import org.apache.spark.api.java.function.Function;
6. import org.apache.spark.api.java.function.Function2;
7. import org.apache.spark.api.java.function.PairFunction;
8. import scala.Tuple2;
9.
10.
11. import java.util.Arrays;
12. import java.util.List;
13. import Pattern;
14.
15.
16. public final class mysparktest {
17.
18.
19.    public static void main(String[] args) throws Exception {
20.
21.
22.        //context ,⽤于读⽂件 ,类似于scala的sc
23.        //格式为:
24.        // JavaSparkContext(master: String, appName: String, sparkHome: String, jars: Array[String], environment: Map[String, St
25.        JavaSparkContext ctx = new JavaSparkContext("yarn-standalone", "JavaWordCount",
26.                v("SPARK_HOME"), JavaSparkContext.jarOfClass(mysparktest.class));
27.
28.
29.        //也可以使⽤ctx获取环境变量,例如下⾯的语句
30.        System.out.println("spark home:"+SparkHome());
30.        System.out.println("spark home:"+SparkHome());
31.
32.
33.
34.
35.
36.
37.          //⼀次⼀⾏,String类型    ,还有hadoopfile,sequenceFile什么的  ,可以直接⽤sc.textFile("path")
38.        JavaRDD<String> lines = File(args[1], 1);  //java.lang.String path, int minSplits
39.        lines.cache();  //cache,暂时放在缓存中,⼀般⽤于哪些可能需要多次使⽤的RDD,据说这样会减少运⾏时间
40.
41.
42.        //collect⽅法,⽤于将RDD类型转化为java基本类型,如下
43.        List<String> line = llect();
44.        for(String val:line)
45.                System.out.println(val);
46.
47.
48.        //下⾯这些也是RDD的常⽤函数
49.        // llect();  List<String>
50.        // lines.union();    javaRDD<String>
51.        // p(1);    List<String>
52.        // unt();      long
53.        // untByValue();
54.
55.
56.        /**
java中split的用法
57.          *  filter test
58.          *  定义⼀个返回bool类型的函数,spark运⾏filter的时候会过滤掉那些返回只为false的数据
59.          *  String s,中的变量s可以认为就是变量lines(lines可以理解为⼀系列的String类型数据)的每⼀条数据
60.          */
61.        JavaRDD<String> contaninsE = lines.filter(new Function<String, Boolean>() {
62.            @Override
63.            public Boolean call(String s) throws Exception {
64.
65.
66.                return (s.contains("they"));
67.            }
68.        });
69.        System.out.println("--------------next filter's  result------------------");
70.        line = llect();
71.        for(String val:line)
72.            System.out.println(val);
73.
73.
74.
75.        /**
76.          * sample test
77.          * sample函数使⽤很简单,⽤于对数据进⾏抽样
78.          * 参数为:withReplacement: Boolean, fraction: Double, seed: Int
79.          *
80.          */
81.
82.
83.        JavaRDD<String> sampletest = lines.sample(false,0.1,5);
84.        System.out.println("-------------next sample-------------------");
85.        line = llect();
86.        for(String val:line)
87.            System.out.println(val);
88.
89.
90.
91.
92.        /**
93.          *
94.          * new FlatMapFunction<String, String>两个string分别代表输⼊和输出类型
95.          * Override的call⽅法需要⾃⼰实现⼀个转换的⽅法,并返回⼀个Iterable的结构
96.          *
97.          * flatmap属于⼀类⾮常常⽤的spark函数,简单的说作⽤就是将⼀条rdd数据使⽤你定义的函数给分解成多条rdd数据
98.          * 例如,当前状态下,lines这个rdd类型的变量中,每⼀条数据都是⼀⾏String,我们现在想把他拆分成1个个的词的话,
99.          * 可以这样写 :
100.          */
101.
102.
103.        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
104.            @Override
105.            public Iterable<String> call(String s) {
106.                  String[] words=s.split(" ");
107.                  return Arrays.asList(words);
108.            }
109.        });
110.
111.
112.
113.
114.        /**
115.          * map 键值对 ,类似于MR的map⽅法
116.          * pairFunction<T,K,V>: T:输⼊类型;K,V:输出键值对
117.          * 需要重写call⽅法实现转换
118.          */
119.        JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
120.            @Override
121.            public Tuple2<String, Integer> call(String s) {
122.                return new Tuple2<String, Integer>(s, 1);
123.            }
124.        });
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.        //A two-argument function that takes arguments
138.        // of type T1 and T2 and returns an R.
139.        /**
140.          *  reduceByKey⽅法,类似于MR的reduce
141.          *  要求被操作的数据(即下⾯实例中的ones)是KV键值对形式,该⽅法会按照key相同的进⾏聚合,在两两运算 142.          */
143.        JavaPairRDD<String, Integer> counts = duceByKey(new Function2<Integer, Integer, Integer>() {  144.            @Override
145.            public Integer call(Integer i1, Integer i2) {  //reduce阶段,key相同的value怎么处理的问题
146.                return i1 + i2;
147.            }
148.        });
149.
150.
151.        //备注:spark也有reduce⽅法,输⼊数据是RDD类型就可以,不需要键值对,
152.        // reduce⽅法会对输⼊进来的所有数据进⾏两两运算
153.
154.
155.
156.
157.
158.
159.        /**
160.          * sort,顾名思义,排序
160.          * sort,顾名思义,排序
161.          */
162.        JavaPairRDD<String,Integer> sort = counts.sortByKey();
163.        System.out.println("----------next sort----------------------");
164.
165.
166.
167.
168.        /**
169.          * collect⽅法其实之前已经出现了多次,该⽅法⽤于将spark的RDD类型转化为我们熟知的java常见类型 170.          */
171.        List<Tuple2<String, Integer>> output = llect();
172.        for (Tuple2<?,?> tuple : output) {
173.            System.out.println(tuple._1 + ": " + tuple._2());
174.        }
175.
176.
177.
178.
179.        /**
180.          * 保存函数,数据输出,spark为结果输出提供了很多接⼝
181.          */
182.        sort.saveAsTextFile("/tmp/spark-tmp/test");
183.
184.
185.
186.
187.
188.
189.        // sort.saveAsNewAPIHadoopFile();
190.      //  sort.saveAsHadoopFile();
191.        it(0);
192.    }
193. }
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。