⽤java编写spark程序,简单⽰例及运⾏
最近因为⼯作需要,研究了下spark,因为scala还不熟,所以先学习了java的spark程序写法,下⾯是我的简单测试程序的代码,⼤部分函
数的⽤法已在注释⾥⾯注明。
我的环境:hadoop 2.2.0
spark-0.9.0
scala-2.10.3
jdk1.7
[java]
1. import org.apache.spark.api.java.JavaPairRDD;
2. import org.apache.spark.api.java.JavaRDD;
3. import org.apache.spark.api.java.JavaSparkContext;
4. import org.apache.spark.api.java.function.FlatMapFunction;
5. import org.apache.spark.api.java.function.Function;
6. import org.apache.spark.api.java.function.Function2;
7. import org.apache.spark.api.java.function.PairFunction;
8. import scala.Tuple2;
9.
10.
11. import java.util.Arrays;
12. import java.util.List;
13. import Pattern;
14.
15.
16. public final class mysparktest {
17.
18.
19. public static void main(String[] args) throws Exception {
20.
21.
22. //context ,⽤于读⽂件 ,类似于scala的sc
23. //格式为:
24. // JavaSparkContext(master: String, appName: String, sparkHome: String, jars: Array[String], environment: Map[String, St
25. JavaSparkContext ctx = new JavaSparkContext("yarn-standalone", "JavaWordCount",
26. v("SPARK_HOME"), JavaSparkContext.jarOfClass(mysparktest.class));
27.
28.
29. //也可以使⽤ctx获取环境变量,例如下⾯的语句
30. System.out.println("spark home:"+SparkHome());
30. System.out.println("spark home:"+SparkHome());
31.
32.
33.
34.
35.
36.
37. //⼀次⼀⾏,String类型 ,还有hadoopfile,sequenceFile什么的 ,可以直接⽤sc.textFile("path")
38. JavaRDD<String> lines = File(args[1], 1); //java.lang.String path, int minSplits
39. lines.cache(); //cache,暂时放在缓存中,⼀般⽤于哪些可能需要多次使⽤的RDD,据说这样会减少运⾏时间
40.
41.
42. //collect⽅法,⽤于将RDD类型转化为java基本类型,如下
43. List<String> line = llect();
44. for(String val:line)
45. System.out.println(val);
46.
47.
48. //下⾯这些也是RDD的常⽤函数
49. // llect(); List<String>
50. // lines.union(); javaRDD<String>
51. // p(1); List<String>
52. // unt(); long
53. // untByValue();
54.
55.
56. /**
java中split的用法
57. * filter test
58. * 定义⼀个返回bool类型的函数,spark运⾏filter的时候会过滤掉那些返回只为false的数据
59. * String s,中的变量s可以认为就是变量lines(lines可以理解为⼀系列的String类型数据)的每⼀条数据
60. */
61. JavaRDD<String> contaninsE = lines.filter(new Function<String, Boolean>() {
62. @Override
63. public Boolean call(String s) throws Exception {
64.
65.
66. return (s.contains("they"));
67. }
68. });
69. System.out.println("--------------next filter's result------------------");
70. line = llect();
71. for(String val:line)
72. System.out.println(val);
73.
73.
74.
75. /**
76. * sample test
77. * sample函数使⽤很简单,⽤于对数据进⾏抽样
78. * 参数为:withReplacement: Boolean, fraction: Double, seed: Int
79. *
80. */
81.
82.
83. JavaRDD<String> sampletest = lines.sample(false,0.1,5);
84. System.out.println("-------------next sample-------------------");
85. line = llect();
86. for(String val:line)
87. System.out.println(val);
88.
89.
90.
91.
92. /**
93. *
94. * new FlatMapFunction<String, String>两个string分别代表输⼊和输出类型
95. * Override的call⽅法需要⾃⼰实现⼀个转换的⽅法,并返回⼀个Iterable的结构
96. *
97. * flatmap属于⼀类⾮常常⽤的spark函数,简单的说作⽤就是将⼀条rdd数据使⽤你定义的函数给分解成多条rdd数据
98. * 例如,当前状态下,lines这个rdd类型的变量中,每⼀条数据都是⼀⾏String,我们现在想把他拆分成1个个的词的话,
99. * 可以这样写 :
100. */
101.
102.
103. JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
104. @Override
105. public Iterable<String> call(String s) {
106. String[] words=s.split(" ");
107. return Arrays.asList(words);
108. }
109. });
110.
111.
112.
113.
114. /**
115. * map 键值对 ,类似于MR的map⽅法
116. * pairFunction<T,K,V>: T:输⼊类型;K,V:输出键值对
117. * 需要重写call⽅法实现转换
118. */
119. JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
120. @Override
121. public Tuple2<String, Integer> call(String s) {
122. return new Tuple2<String, Integer>(s, 1);
123. }
124. });
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137. //A two-argument function that takes arguments
138. // of type T1 and T2 and returns an R.
139. /**
140. * reduceByKey⽅法,类似于MR的reduce
141. * 要求被操作的数据(即下⾯实例中的ones)是KV键值对形式,该⽅法会按照key相同的进⾏聚合,在两两运算 142. */
143. JavaPairRDD<String, Integer> counts = duceByKey(new Function2<Integer, Integer, Integer>() { 144. @Override
145. public Integer call(Integer i1, Integer i2) { //reduce阶段,key相同的value怎么处理的问题
146. return i1 + i2;
147. }
148. });
149.
150.
151. //备注:spark也有reduce⽅法,输⼊数据是RDD类型就可以,不需要键值对,
152. // reduce⽅法会对输⼊进来的所有数据进⾏两两运算
153.
154.
155.
156.
157.
158.
159. /**
160. * sort,顾名思义,排序
160. * sort,顾名思义,排序
161. */
162. JavaPairRDD<String,Integer> sort = counts.sortByKey();
163. System.out.println("----------next sort----------------------");
164.
165.
166.
167.
168. /**
169. * collect⽅法其实之前已经出现了多次,该⽅法⽤于将spark的RDD类型转化为我们熟知的java常见类型 170. */
171. List<Tuple2<String, Integer>> output = llect();
172. for (Tuple2<?,?> tuple : output) {
173. System.out.println(tuple._1 + ": " + tuple._2());
174. }
175.
176.
177.
178.
179. /**
180. * 保存函数,数据输出,spark为结果输出提供了很多接⼝
181. */
182. sort.saveAsTextFile("/tmp/spark-tmp/test");
183.
184.
185.
186.
187.
188.
189. // sort.saveAsNewAPIHadoopFile();
190. // sort.saveAsHadoopFile();
191. it(0);
192. }
193. }
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论