如何在Java应⽤中提交Spark任务?
最近看到有⼏个Github友关注了Streaming的监控⼯程——,所以思来想去还是优化下代码,不能让别⼈看笑话啊。于是就想改⼀下之前觉得最丑陋的⼀个地⽅——任务提交。
本博客内容基于Spark2.2版本~在阅读⽂章并想实际操作前,请确保你有:
1. ⼀台配置好Spark和yarn的服务器
2. ⽀持正常spark-submit --master yarn xxxx 的任务提交
⽼版本
⽼版本任务提交是基于 ** 启动本地进程,执⾏脚本spark-submit xxx ** 的⽅式做的。其中⼀个关键的问题就是获得提交Spark任务的Application-id,因为这个id是跟任务状态的跟踪有关系的。如果你的资源管理框架⽤的是yarn,应该知道每个运⾏的任务都有⼀个applicaiton_id,这个id的⽣成规则是:
appplication_时间戳_数字
⽼版本的spark通过修改SparkConf参数spark.app.id就可以⼿动指定id,新版本的代码是直接读取的taskBackend中的applicationId()⽅法,这个⽅法具体的实现是根据实现类来定的。在yarn中,是通过Yarn的实现的,具体的实现逻辑可以参考对应的链接。
感兴趣的同学可以看⼀下,⽣成applicaiton_id的逻辑在hadoop-yarn⼯程的中定义。
总结⼀句话就是,想要⾃定义id,甭想了
于是当时脑袋⽠不灵光的我,就想到那就等应⽤创建好了之后,直接写到数据库⾥⾯呗。怎么写呢?
1. 我事先⽣成⼀个⾃定义的id,当做参数传递到spark应⽤⾥⾯;
2. 等spark初始化后,就可以通过sparkContext取得对应的application_id以及url
3. 然后再driver连接数据库,插⼊⼀条关联关系
新版本
还是归结于互联⽹时代的信息⼤爆炸,我看到友的聊天,知道了SparkLauncer这个东西,调查后发
现他可以基于Java代码⾃动提交Spark 任务。SparkLauncher⽀持两种模式:
1. new SparkLauncher().launch() 直接启动⼀个Process,效果跟以前⼀样
2. new SparkLauncher().startApplicaiton() 返回⼀个SparkAppHandler,并(可选)传⼊⼀个
当然是更倾向于第⼆种啦,因为好处很多:
1. ⾃带输出重定向(Output,Error都有,⽀持写到⽂件⾥⾯),超级爽的功能
2. 可以⾃定义,当信息或者状态变更时,都能进⾏操作(对我没啥⽤)
3. 返回的SparkAppHandler⽀持暂停、停⽌、断连、获得AppId、获得State等多种功能,我就想要这个
⾸先创建⼀个最基本的Spark程序:
import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;
public class HelloWorld {
public static void main(String[] args) throws InterruptedException {
SparkSession spark = SparkSession
.builder()
//.master("yarn")
//.appName("hello-wrold")
//.config("fig.option", "some-value")
.getOrCreate();
List<Person> persons = new ArrayList<>();
persons.add(new Person("zhangsan", 22, "male"));
persons.add(new Person("lisi", 25, "male"));
persons.add(new Person("wangwu", 23, "female"));
spark.close();
}
}
然后创建SparkLauncher类:
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import java.io.IOException;
public class Launcher {
public static void main(String[] args) throws IOException {
SparkAppHandle handler = new SparkLauncher()
.setAppName("hello-world")
.setSparkHome(args[0])
.setMaster(args[1])
.setConf("", "2g")
.setConf("", "1g")
.setConf("s", "3")
.setAppResource("/home/xinghailong/launcher/launcher_test.jar")
.setMainClass("HelloWorld")
.
addAppArgs("I come from Launcher")
.setDeployMode("cluster")
.startApplication(new SparkAppHandle.Listener(){
@Override
public void stateChanged(SparkAppHandle handle) {
System.out.println("**********  state  changed  **********");
}
@Override
public void infoChanged(SparkAppHandle handle) {session如何设置和读取
System.out.println("**********  info  changed  **********");
}
});
while(!"FINISHED".State().toString()) && !"FAILED".State().toString())){
System.out.println("id    "+AppId());
System.out.println("state "+State());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
然后打包⼯程,打包过程可以参考之前的博客:
打包完成后上传到部署Spark的服务器上。由于SparkLauncher所在的类引⽤了SparkLauncher,所以还需要把这个jar也上传到服务器上。[xinghailong@hnode10 launcher]$ ls
launcher_test.jar  spark-launcher_2.11-2.2.0.jar
[xinghailong@hnode10 launcher]$ pwd
/home/xinghailong/launcher
由于SparkLauncher需要指定SPARK_HOME,因此如果你的机器可以执⾏spark-submit,那么就看⼀下spark-submit⾥
⾯,SPARK_HOME是在哪
[xinghailong@hnode10 launcher]$ which spark2-submit
/var/lib/hadoop-hdfs/bin/spark2-submit
最后⼏⾏就能看到:
export SPARK2_HOME=/var/lib/hadoop-hdfs/app/spark
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec "${SPARK2_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
综上,我们需要的是:
1. ⼀个⾃定义的Jar,⾥⾯包含spark应⽤和SparkLauncher类
2. ⼀个SparkLauncher的jar,spark-launcher_2.11-2.2.0.jar 版本根据你⾃⼰的来就⾏
3. ⼀个当前⽬录的路径
4. ⼀个SARK_HOME环境变量指定的⽬录
然后执⾏命令启动测试:
java -dirs=/home/xinghailong/launcher -cp launcher_test.jar Launcher /var/lib/hadoop-hdfs/app/spark yarn
说明:
1. -dirs设置当前⽬录为java类加载的⽬录
2. 传⼊两个参数,⼀个是SPARK_HOME;⼀个是启动模式
观察删除发现成功启动运⾏了:
id    null
state UNKNOWN
Mar 10, 2018 12:00:52 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:00:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for using builtin-java classes where applicable **********  state  changed  **********
...省略⼀⼤堆拷贝jar的⽇志
**********  info  changed  **********
**********  state  changed  **********
Mar 10, 2018 12:00:55 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:00:55 INFO yarn.Client: Application report for application_1518263195995_37615 (state: ACCEPTED)
... 省略⼀堆重定向的⽇志
application_1518263195995_37615 (state: ACCEPTED)
id    application_1518263195995_37615
state SUBMITTED
Mar 10, 2018 12:01:00 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:00 INFO yarn.Client: Application report for application_1518263195995_37615 (state: RUNNING)
**********  state  changed  **********
... 省略⼀堆重定向的⽇志
INFO:  user: hdfs
**********  state  changed  **********
Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Shutdown hook called
Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-f07e0213-61fa-4710-90f5-2fd2030e0701
总结
这样就实现了基于Java应⽤提交Spark任务,并获得其Appliation_id和状态进⾏定位跟踪的需求了。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。