史上最简单的spark教程第⼆章-快速开发部署你的第⼀个Java+spark程序spark的核⼼概念
史上最简单的spark教程
所有代码⽰例地址:
(提前声明:⽂章由作者:张耀峰 结合⾃⼰⽣产中的使⽤经验整理,最终形成简单易懂的⽂章,写作不易,转载请注明)
(⽂章参考:Elasticsearch权威指南,Spark快速⼤数据分析⽂档,Elasticsearch官⽅⽂档,实际项⽬中的应⽤场景)
(帮到到您请点点关注,⽂章持续更新中!)
Git主页
第⼀章的shell程序是如何运⾏的?下⾯给⼤家解释⼀波
从上层看,每个spark应⽤都由⼀个驱动器程序(driver program)发起集上的并⾏操作
驱动器程序包含了应⽤的main函数,并且定义了集上的分布式数据集,并对这些数据集进⾏相关的操
之前运⾏的那个shell栗⼦,驱动器程序就是spark.shell本⾝.所以我们只需要输⼊我们需要做的操作就可以了
驱动器程序通过SparkContext对象访问spark,sparkContext代表对计算集连接的⼀个连接,shell启动的时候会⾃动创建,就是我们操作的sc变量,在控制台输⼊sc可以查看它的类型
只要有了sparkContext我们就可以创建RDD,然后进⾏我们的操作,要执⾏这些操作,sc⼀般要管理多个执⾏器节点,如果我们刚才的count()是在集中运⾏,那么不同的节点就会计算⽂件不同的⾏数,我们刚才的运⾏模式是单机模式
集运⾏模式如下:
扩展我们的SparkShell程序
第⼀章我们运⾏的count()或者first()都是内置好的函数,⽆需我们控制任何参数,我就了些可以传递参数的Api,运⾏到我们的节点上事例:从README.md RDD中筛选特定的单词
代码⽰例
val lines = sc.textFile("你的README⽂件路径") // 创建⼀个叫lines的RDD lines:
val pythonLines = lines.filter(line => ains("Python")) //lambda语法糖
pythonLines.first() //输出第⼀⾏包含python的⽂本
其实 Spark API 最神奇的地⽅就在于像 filter 这样基于函数的操作也会在集上并⾏执⾏,也就是说,Spark 会⾃动将 函数(⽐如ains(“Python”))发到各个执⾏器节点上,这样就可以在单⼀的驱动器程序中编码,并且让代码⾃动运⾏在多个节点上
java运⾏第⼀个spark程序
之前我们⼀直都是以交互式案例了解spark的运⾏机制,除了交互式运⾏之外,spark也可以在Java,scal
a,Python的独⽴程序中被连接使⽤,独⽴程序中运⾏就是需要⾃⾏初始化SparkContext,其他的都是⼀样的概念
在Java中运⾏Spark需要添加spark-core的maven依赖. 个⼈使⽤的版本为:2.2.3
Maven就不解释了,Java开发中⽐较流⾏的包管理⼯具,直接开始创建我们的项⽬吧
开发⼯具:Intellij IDEA
Java版本:JDK1.8以上
⾸先创建Maven⼯程
创建好之后在l⽂件中添加我们需要的jar包:spark-core
<!--定义spark版本-->
<properties>
<spark.version>2.2.3</spark.version>
</properties>
<!--spark-core核⼼包-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
接着我们就可以开始编写我们的Spark代码
开始的时候我写了在java或者scala中编写spark程序需要我们⼿动创建sparkcontext
第⼀步先初始化我们的SparkContext,初始化需要两个基本参数:[集URL,应⽤名]
集URL:告诉 Spark 如何连接到集上,我们现在先使⽤local的模式,这个模式可以让Spark运⾏在单机单线程上⽽⽆需连接到集
应⽤名:我使⽤的是sparkBoot,当连接到⼀个集时.这个值可以帮助你在集管理器的⽤户界⾯中到你的应⽤
SparkConf sparkConf = new SparkConf().setAppName("sparkBoot").setMaster("local");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
在初始化 SparkContext 之后,我们可以使⽤前⾯展⽰的所有⽅法(⽐如利⽤⽂本⽂件)来创建 RDD 并操控它们
package web;
import org.apache.spark.SparkConf;
python转java代码
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.*;
import Pattern;
/**
* Created by 張燿峰
* wordcount案例测试
* @author 孤
* @date 2019/3/11
* @Varsion 1.0
*/
public class SparkApplication {
private static final Pattern SPACE = Patternpile(" ");
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("sparkBoot").setMaster("local");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = File("/usr/local/data").cache();
lines.map(new Function<String, Object>() {
@Override
public Object call(String s) {
return s;
}
});
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) {
return Arrays.asList(SPACE.split(s)).iterator();
}
});
JavaPairRDD<String, Integer> wordsOnes = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> wordsCounts = duceByKey(new Function2<Integer, Integer, Integer>() {            @Override
public Integer call(Integer value, Integer toValue) {
return value + toValue;
}
});
wordsCounts.saveAsTextFile("/usr/local/data1");
}
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。