Flink学习笔记(三):任务提交详细流程
⽂章⽬录
1.Flink多种提交⽅式对⽐
常⽤提交⽅式分为local,standalone,yarn三种。
local:本地提交项⽬,可纯粹的在本地单节点运⾏,也可以将本地代码提交到远端flink集运⾏。
standalone:flink集⾃⼰完成资源调度,不依赖于其他资源调度器,需要⼿动启动flink集。
yarn:依赖于hadoop yarn资源调度器,由yarn负责资源调度,不需要⼿动启动flink集。需要先启动yarn和hdfs。⼜分为yarn-session和yarn-cluster两种⽅式。提交Flink任务时,所在机器必须要⾄少设置环境变
量YARN_CONF_DIR、HADOOP_CONF_DIR、HADOOP_CONF_PATH中的⼀个,才能读取YARN和HDFS的配置信息(会按三者从左到右的顺序读取,只要发现⼀个就开始读取。如果没有正确设置,会尝试使⽤HADOOP_HOME/etc/hadoop),否则提交任务会失败。
1.1 local模式
local即本地模式,可以不依赖hadoop,可以不搭建flink集。⼀般在开发时调试时使⽤。
1.1.1 纯粹的local模式运⾏
就是直接运⾏项⽬中的代码的⽅式,例如直接在idea中运⾏。创建ExecutionEnvironment的⽅式如下:
// getExecutionEnvironment()⽅法可以根据flink运⽤程序如何提交判断出是那种模式提交
//ExecutionEnvironment env = ExecutionEnvironment();
ExecutionEnvironment env = ateLocalEnvironment();
1.1.2 local使⽤remote的⽅式运⾏
这种⽅式可以将本地代码提交给远端flink集运⾏,需要指定集的master地址。在flink集的web ui会存在Running
Job/Compaleted Jobs的记录。
public class TestLocal {
public static void main(String[] args)throws Exception {
ExecutionEnvironment env = ateRemoteEnvironment("remote_ip",8090,"D:\\code\\flink-local.jar");
System.out.Parallelism());
.print();
}
}
1.1.3 本地提交到remote集
例如有如下项⽬代码:
public class TestLocal {
public static void main(String[] args)throws Exception {
ExecutionEnvironment env = ExecutionEnvironment();
.print();
}
}
将项⽬打成jar包,在本机上使⽤flink run 指定集的模式提交(这个机器可以不是flink集的节点,但需要在local机器上有flink提交命令,)。如:
./flink run -m remote_ip:8090-p st.TestLocal /home/hdp/flink-local.jar
# -m flink 集地址
# -p 配置job的并⾏度
# -c Job执⾏的main class
也会在flink web ui界⾯显⽰结果。
1.2 standalone模式
上⾯讲了flink在local机器上进⾏提交,需要指定flink的master信息。
standalone模式提交也是类似,不过可以不⽤指定master节点,还有个区别就是,提交是在flink集的机器节点上。
两个提交命令⽰例:
# 前台提交
./flink run -p st.TestLocal /home/hdp/flink-local.jar
# 通过-d后台提交
./flink run -p st.TestLocal -d /home/hdp/flink-local.jar
1.3 yarn模式
yarn模式必须确保本机配置了HADOOP_HOME环境变量,flink会尝试通过HADOOP_HOME/etc/hadoop⽬录下的配置⽂件连接yarn。
flink on yarn 有两种提交⽅式:
1. yarn-session:启动⼀个YARN session(Start a long-running Flink cluster on YARN)
2. yarn-cluster:直接在YARN上提交运⾏Flink作业(Run a Flink job on YARN)
两者区别
⼀种是yarn-session,就是把⾸先启动⼀个yarn-session当成了⼀个flink容器,官⽅说法是flink服务,然后我们提交到yarn上⾯的全部flink任务全部都是提交到这个服务,也就是容器⾥⾯进⾏运⾏的。flink任务之间也是独⽴的,但是都存在于flink服务即容器⾥⾯,yarn上只能监测到⼀个flink服务即容器,⽆法监测到flink单个任务,需要进⼊flink服务即容器内部,才可以看到。
另⼀种是yarn-cluster,就是每个把flink任务当成了⼀个application,就是⼀个job,在yarn上可以管理,flink任务之间互相是独⽴的。
1.3.1 yarn-session
这种模式下会启动yarn session,并且会启动Flink的两个必要服务:JobManager和TaskManagers,然后你可以向集提交作业。同⼀个Session中可以提交多个Flink作业。需要注意的是,这种模式下H
adoop的版本⾄少是2.2,⽽且必须安装了HDFS(因为启动YARN session的时候会向HDFS上提交相关的jar⽂件和配置⽂件)。
yarn-session模式提交任务的步骤:
1.启动yarn-session,我们可以通过./bin/yarn-session.sh脚本启动,参数如下:
Usage:
Required
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-D <property=value> use value for given property
-d,--detached If present, runs the job in detached mode
-h,--help Help for the Yarn session CLI.
-id,--applicationId <arg> Attach to running YARN session
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
-nl,--nodeLabel <arg> Specify YARN node label for the YARN application
-nm,--name <arg> Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
session和application的区别-sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e .g., in response to a user interrupt, such
as typing Ctrl + C.
-st,--streaming Start Flink in streaming mode
-t,--ship <arg> Ship files in the specified directory (t for transfer)
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
例如:
# ⽅式⼀:启动⼀个新的flink集
./bin/yarn-session.sh -n 4-tm 8192-s 8
# ⽅式⼆:附着到⼀个已存在的flink yarn session
附着到⼀个已存在的flink yarn session
⽅式⼀启动了4个TaskManager,每个TaskManager内存为8G且占⽤了8个核(是每个TaskManager,默认是1个核)。在启动YARN session的时候会加载conf/flink-config.yaml配置⽂件,我们可以根据⾃⼰的需求去修改⾥⾯的相关参数。启动后可以在flink web ui查看运⾏情况。
2.启动了yarn session之后,就可以通过flink run命令提交作业了
例如:
./bin/flink run ./examples/batch/WordCount.jar --input hdfs:///user/zhang/LICENSE
# --input是程序的⼊参,不是flink内部参数
1.3.2 yarn-cluster
上⾯的YARN session是在Hadoop YARN环境下启动⼀个Flink cluster集,⾥⾯的资源是可以共享给其他的Flink作业。我们还可以在YARN上启动⼀个Flink作业。这⾥我们还是使⽤./bin/flink,但是不需要事先启动YARN session:
./bin/flink run -m yarn-cluster -yn 2./examples/batch/WordCount.jar \
--input hdfs:///user/zhang/LICENSE \
--output hdfs:///user/
上⾯的命令同样会启动⼀个类似于YARN session启动的页⾯。其中的-yn是指TaskManager的个数,必须指定。
2.flink命令参数详解
1. flink run参数:flink run命令执⾏模板:
flink run [option] <jar-file> <arguments>
-c,--class<classname>:需要指定的main⽅法的类
-C,--classpath <url>:向每个⽤户代码添加url,他是通过UrlClassLoader加载。url需要指定⽂件的schema如(file://)
-d,--detached :在后台运⾏
-p,--parallelism <parallelism>: job需要指定env的并⾏度,这个⼀般都需要设置。
-q,--sysoutLogging :禁⽌logging输出作为标准输出。
-s,--fromSavepoint <savepointPath>:基于savepoint保存下来的路径,进⾏恢复。
-sae,--shutdownOnAttachedExit :如果是前台的⽅式提交,当客户端中断,集执⾏的job任务也会shutdown。
提交项⽬时如何指定外部依赖jar包呢?
通过-C命令添加外部jar包的路径,但是有两点需要注意:
a、需要指定⽂件路径的协议:例如本地⽂件⽤file://,多个jar包⽤*表⽰,这⾥不⽀持hdfs://
b、指定的路径要flink的所有节点都能访问
例如我在做flink与kafka和hbase整合测试时,需要依赖kafka和hbase的jar包,我的做法是,将依赖的jar包传到每个flink节点的flink_home/lib⽬录下,然后编写如下的启动脚本:
2. flink run -m yarn-cluster参数
-m,--jobmanager : yarn-cluster集
-yd,--yarndetached :后台
-yjm,--yarnjobManager : jobmanager的内存
-ytm,--yarntaskManager : taskmanager的内存
-yn,--yarncontainer : TaskManager的个数
-yid,--yarnapplicationId : job依附的applicationId
-ynm,--yarnname : application的名称
-ys,--yarnslots :分配的slots个数
flink run -m yarn-cluster -yd -yjm 1024m -ytm 1024m -ynm <name>-ys 1<jar><arguments>
3. flink list
flink list:列出flink的job列表。
flink list -r/--runing :列出正在运⾏的job
flink list -s/--scheduled :列出已调度完成的job
4. flink cancel
flink cancel [options]<job_id>:取消正在运⾏的job id
flink cancel -s/--withSavepoint <path><job_id>:取消正在运⾏的job,并保存到相应的保存点
5. flink stop:仅仅针对Streaming job
flink stop [options]<job_id>
flink stop <job_id>:停⽌对应的job
6. flink modify
flink modify <job_id>[options]
flink modify <job_id>-p/--parallelism p :修改job的并⾏度
8. flink savepoint(重要)
flink savepoint [options]<job_id><target directory>
eg:
# 触发保存点
flink savepoint <job_id><hdfs://xxxx/xx/x>:将flink的快照保存到hdfs⽬录
# 使⽤yarn触发保存点
flink savepoint <job_id><target_directory>-yid <application_id>
# 使⽤savepoint取消作业
flink cancel -s <tar_directory><job_id>
# 从保存点恢复
flink run -s <target_directoey>[:runArgs]
# 如果复原的程序,对逻辑做了修改,⽐如删除了算⼦可以指定allowNonRestoredState参数复原。
flink run -s <target_directory>-n/--allowNonRestoredState [:runArgs]
3.flink on yarn作业提交详细流程
⼏个⾓⾊的作⽤:
JobManager :接受application,包含StreamGraph(DAG)、JobGraph(logical dataflow graph,已经进过优化,如task chain)和JAR,将JobGraph转化为ExecutionGraph(physical dataflow graph,并⾏化),包含可以并发执⾏的tasks。其他⼯作类似Spark driver,如向RM申请资源、schedule tasks、保存作业的元数据,如checkpoints。如今JM可分为JobMaster和ResourceManager(和下⾯的不同),分别负责任务和资源,在Session模式下启动多个job就会有多个JobMaster。
ResourceManager:⼀般是Yarn,当TM有空闲的slot就会告诉JM,没有⾜够的slot也会启动新的TM。kill掉长时间空闲的TM。
TaskManager类似Spark的executor,会跑多个线程的task、数据缓存与交换。
Dispatcher(Application Master)提供REST接⼝来接收client的application提交,它负责启动JM和提交application,同时运⾏Web UI。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论