Flink源码阅读(⼀)——FlinkonYarn的Per-job模式源码简析
⼀、前⾔
  本⽂是⾃⼰第⼀次尝试写源码阅读的⽂章,会努⼒将原理和源码实现流程结合起来。⽂中有⼏个点⽬前也是没有弄清楚,若是写在⼀篇博客⾥,时间跨度太⼤,但⼜怕后期遗忘,所以先记下来,后期进⼀步阅读源码后再添上,若是看到不完整版博⽂的看官,对不住!
  ⽂中若是写的不准确的地⽅欢迎留⾔指出。
  源码系列基于Flink 1.9
⼆、Per-job提交任务原理
  Flink on Yarn模式下提交任务整体流程图如下()
图1 Flink Runtime层架构图
    2.1. Runtime层架构简介
  Flink采取的是经典的master-salve模式,图中的AM(ApplicationMater)为master,TaskManager是salve。
  AM中的Dispatcher⽤于接收client提交的任务和启动相应的JobManager ;JobManager⽤于任务的接收,task的分配、管理task manager等;ResourceManager主要⽤于资源的申请和分配。
  这⾥有点需要注意:Flink本⾝也是具有ResourceManager和TaskManager的,这⾥虽然是on Yarn模式,但Flink本⾝也是拥有⼀套资源管理架构,虽然各个组件的名字⼀样,但这⾥yarn只是⼀个资源的提供者,若是standalone模式,资源的提供者就是物理机或者虚拟机了。 
  2.2. Flink on Yarn 的Per-job模式提交任务的整体流程:
  1)执⾏Flink程序,就类似client,主要是将代码进⾏优化形成JobGraph,向yarn的ResourceManager中的ApplicationManager申请资源启动
AM(ApplicationMater),AM所在节点是Yarn上的NodeManager上;
  2)当AM起来之后会启动Dispatcher、ResourceManager,其中Dispatcher会启动JobManager,ResourceManager会启动slotManager⽤于slot的管理和分配;
  3)JobManager向ResourceManager(RM)申请资源⽤于任务的执⾏,最初TaskManager还没有启动,此时,RM会向yarn去申请资源,获得资源后,会在资源中启动TaskManager,相应启动的slot会向slotManager中注册,然后slotManager会将slot分配给只需资源的task,即向JobManager注册信息,然后JobManager就会将任务提交到对应的slot中执⾏。其实Flink on yarn的session模式和Per-job模式最⼤的区别是,提交任务时RM已向Yarn申请了固定⼤⼩的资源,其TaskManager是已经启动的。
  资源分配如详细过程图下:
图2 slot管理图,源⾃Ref[1]
  更详细的过程解析,强烈推荐,是阿⾥Flink⼤⽜写的,本博客在后期的源码分析过程也多依据此博客。
三、源码简析
  提交任务语句
./flink run -m yarn-cluster ./flinkExample.jar
  1、Client端提交任务阶段分析
  flink脚本的⼊⼝类是org.apache.flink.client.cli.CliFrontend。
  1)在CliFronted类的main()⽅法中,会加载flnk以及⼀些全局的配置项之后,根据命令⾏参数run,调⽤run()->runProgram()-
>deployJobCluster(),具体的代码如下:
private <T> void runProgram(
CustomCommandLine<T> customCommandLine,
CommandLine commandLine,
RunOptions runOptions,
PackagedProgram program) throws ProgramInvocationException, FlinkException {
final ClusterDescriptor<T> clusterDescriptor = ateClusterDescriptor(commandLine);
try {
final T clusterId = ClusterId(commandLine);
final ClusterClient<T> client;
// directly deploy the job if the cluster is started in job mode and detached
if (clusterId == null && DetachedMode()) {
int parallelism = Parallelism() == -1 ? defaultParallelism : Parallelism();
          //构建JobGraph
final JobGraph jobGraph = ateJobGraph(program, configuration, parallelism);
final ClusterSpecification clusterSpecification = ClusterSpecification(commandLine);
          //将任务提交到yarn上
client = clusterDescriptor.deployJobCluster(
clusterSpecification,
jobGraph,
logAndSysout("Job has been submitted with JobID " + JobID());
......................
} else{........}
  2)提交任务会调⽤YarnClusterDescriptor 类中deployJobCluster()->AbstractYarnClusterDescriptor类中deployInteral(),该⽅法会⼀直阻塞直到ApplicationMaster/JobManager在yarn上部署成功,其中最关键的调⽤是对startAppMaster()⽅法的调⽤,代码如下:
1protected ClusterClient<ApplicationId>    deployInternal(
2            ClusterSpecification clusterSpecification,
3            String applicationName,
4            String yarnClusterEntrypoint,
5            @Nullable JobGraph jobGraph,
6boolean detached) throws Exception {
7
8//1、验证集是否可以访问
9//2、若⽤户组是否开启安全认证
10//3、检查配置以及vcore是否满⾜flink集申请的需求
11//4、指定的对列是否存在
12//5、检查内存是否满⾜flink JobManager、NodeManager所需
13//....................................
14
15//Entry
16        ApplicationReport report = startAppMaster(
17                flinkConfiguration,
18                applicationName,
19                yarnClusterEntrypoint,
20                jobGraph,
21                yarnClient,
22                yarnApplication,
23                validClusterSpecification);
24
25//6、获取flink集端⼝、地址信息
26//..........................................
27    }
 3)⽅法AbstractYarnClutserDescriptor.startAppMaster()主要是将配置⽂件和相关⽂件上传⾄分布式存储如HDFS,以及向Yarn上提交任务等,源码分析如下:
1public ApplicationReport startAppMaster(
2            Configuration configuration,
3            String applicationName,
4            String yarnClusterEntrypoint,
5            JobGraph jobGraph,
6            YarnClient yarnClient,
7            YarnClientApplication yarnApplication,
8            ClusterSpecification clusterSpecification) throws Exception {
9
10// .......................
11
12//1、上传conf⽬录下l、log4j.properties
13
14//2、上传环境变量中FLINK_PLUGINS_DIR ,FLINK_LIB_DIR包含的jar
15        addEnvironmentFoldersToShipFiles(systemShipFiles);
16//...........
17//3、设置applications的⾼可⽤的⽅案,通过设置AM重启次数,默认为1
18//4、上传ship files、user jars、
19//5、为TaskManager设置slots、heap memory
20//6、上传flink-conf.yaml
21//7、序列化JobGraph后上传
22//8、登录权限检查
23
24//.................
25
26//获得启动AM container的Java命令
27final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
28                yarnClusterEntrypoint,
29                hasLogback,
30                hasLog4j,
31                hasKrb5,
32                MasterMemoryMB());
33
34//9、为aAM启动绑定环境参数以及classpath和环境变量
35
36//..........................
37
38final String customApplicationName = customName != null ? customName : applicationName;
39//10、应⽤名称、应⽤类型、⽤户提交的应⽤ContainerLaunchContext
40        appContext.setApplicationName(customApplicationName);
41        appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
42        appContext.setAMContainerSpec(amContainer);
43        appContext.setResource(capability);
44
45if (yarnQueue != null) {
46            appContext.setQueue(yarnQueue);
47        }
48
49        setApplicationNodeLabel(appContext);
50
51        setApplicationTags(appContext);
52
53//11、部署失败删除yarnFilesDir
54// add a hook to clean up in case deployment fails
55        Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
56        Runtime().addShutdownHook(deploymentFailureHook);
57
58        LOG.info("Submitting application master " + appId);
59
60//Entry
61        yarnClient.submitApplication(appContext);
62
63        LOG.info("Waiting for the  cluster to be allocated");
64final long startTime = System.currentTimeMillis();
65        ApplicationReport report;
66        YarnApplicationState lastAppState = YarnApplicationState.NEW;
67//12、阻塞等待直到running
68        loop: while (true) {
69//...................
70//每隔250ms通过YarnClient获取应⽤报告
71            Thread.sleep(250);
72        }
73//...........................
74//13、部署成功删除shutdown回调
75// since deployment was successful, remove the hook
76        veShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
77return report;
78    }
  4)应⽤提交的Entry是YarnClientImpl.submitApplication(),该⽅法在于调⽤了ApplicationClientProtocolPBClientImpl.submitApplication(),其具体代码如下:
1public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnException, IOException {
2//取出报⽂
3        SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl)request).getProto();
4
5try {
6//将报⽂发送发送到服务端,并将返回结果构成response
7return new SubmitApplicationResponsePBImpl(this.proxy.submitApplication((RpcController)null, requestProto));
8        } catch (ServiceException var4) {
9            RPCUtil.unwrapAndThrowException(var4);
10return null;
11        }
12    }
View Code
  报⽂就会通过RPC到达服务端,服务端处理报⽂的⽅法是ApplicationClientProtocolPBServiceImpl.submitApplication(),⽅法中会重新构建报⽂,然后通过ClientRMService.submitApplication()将应⽤请求提交到Yarn上的RMAppManager去提交任务(在Yarn的分配过后⾯会专门写去说明)。
  ⾄此,client端的流程就⾛完了,应⽤请求已提交到Yarn的ResourceManager上了,下⾯着重分析Flink Cluster启动流程。
  2、Flink Cluster启动流程分析
  1)在ClientRMService类的submitApplication()⽅法中,会先检查任务是否已经提交(通过applicationID)、Yarn的queue是否为空等,然后将请求提交到RMAppManager(ARN RM内部管理应⽤⽣命周期的组件),若提交成功会输出Application with id  {Id()}  submitted by user {user}的信息,具体分析如下:
1public SubmitApplicationResponse submitApplication(
2            SubmitApplicationRequest request) throws YarnException {
3        ApplicationSubmissionContext submissionContext = request
4                .getApplicationSubmissionContext();
5        ApplicationId applicationId = ApplicationId();
6
7// ApplicationSubmissionContext needs to be validated for safety - only
8// those fields that are independent of the RM's configuration will be
9// checked here, those that are dependent on RM configuration are validated
10// in RMAppManager.
11//这⾥仅验证不属于RM的配置,属于RM的配置将在RMAppManager验证
12
13//1、检查application是否已提交
14//2、检查提交的queue是否为null,是,则设置为默认queue(default)
15//3、检查是否设置application名,否,则为默认(N/A)
16//4、检查是否设置application类型,否,则为默认(YARN);是,若名字长度⼤于给定的长度(20),则会截断
17//.............................
18
19try {
20// call RMAppManager to submit application directly
21//直接submit任务
22            rmAppManager.submitApplication(submissionContext,
23                    System.currentTimeMillis(), user);
24
25//submit成功
26            LOG.info("Application with id " + Id() +
27                    " submitted by user " + user);
28            RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
29                    "ClientRMService", applicationId);
30        } catch (YarnException e) {
31//失败会抛出异常
32        }
33//..................
34    }
  2)RMAppManager类的submitApplication()⽅法主要是创建RMApp和向ResourceScheduler申请AM container,该部分直到在NodeManager上启动AM container都是Yarn本⾝所为,其中具体过程在这⾥不详细分析,详细过程后期会分析,这⾥仅给出⼊⼝,代码如下:
1protected void submitApplication(
2            ApplicationSubmissionContext submissionContext, long submitTime,
3            String user) throws YarnException {
4        ApplicationId applicationId = ApplicationId();
5
6//1、创建RMApp,若具有相同的applicationId会抛出异常
7        RMAppImpl application =
8                createAndPopulateNewRMApp(submissionContext, submitTime, user);
9        ApplicationId appId = ApplicationId();
10
11//security模式有simple和kerberos,在配置⽂件中配置
12//开始kerberos
13if (UserGroupInformation.isSecurityEnabled()) {
14//..................
15        } else {
16//simple模式
17// Dispatcher is not yet started at this time, so these START events
18// enqueued should be guaranteed to be first processed when dispatcher
19// gets started.
20//2、向ResourceScheduler(可插拔的资源调度器)提交任务
22                    .handle(new RMAppEvent(applicationId, RMAppEventType.START));
23        }
24    }
  3)Flink在Per-job模式下,AM container加载运⾏的⼊⼝是YarnJobClusterEntryPoint中的main()⽅法,源码分析如下:
1public static void main(String[] args) {
2// startup checks and logging
3//1、输出环境信息如⽤户、环境变量、Java版本等,以及JVM参数
4        EnvironmentInformation.logEnvironmentInfo(LOG, SimpleName(), args);
5//2、注册处理各种SIGNAL的handler:记录到⽇志
6        ister(LOG);
7//3、注册JVM关闭保障的shutdown hook:避免JVM退出时被其他shutdown hook阻塞
8        JvmShutdownSafeguard.installAsShutdownHook(LOG);
9
10        Map<String, String> env = v();
11
12final String workingDirectory = (ApplicationConstants.Environment.PWD.key());
13        Preconditions.checkArgument(
14                workingDirectory != null,
15                "Working directory variable (%s) not set",
16                ApplicationConstants.Environment.PWD.key());
17
18try {
19//4、输出Yarn运⾏的⽤户信息
20            YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
21        } catch (IOException e) {
22            LOG.warn("Could not log YARN environment information.", e);
23        }
24//5、加载flink的配置
25        Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG);
26
27        YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(
28                configuration,
29                workingDirectory);
30//6、Entry  创建并启动各类内部服务
31        ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
32    }
  4)后续的调⽤过程:ClusterEntrypoint类中runClusterEntrypoint()->startCluster()->runCluster(),该过程⽐较简单,这⾥着实分析runCluster()⽅法,如下:
1//#ClusterEntrypint.java
2private void runCluster(Configuration configuration) throws Exception {
3synchronized (lock) {
4            initializeServices(configuration);log4j2 阿里
5
6// write host information into configuration
7            configuration.setString(JobManagerOptions.ADDRESS, Address());
8            configuration.setInteger(JobManagerOptions.PORT, Port());
9//1、创建dispatcherResour、esourceManager对象,其中有从本地重新创建JobGraph的过程
10final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration); 11//2、Entry 启动RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等
12            clusterComponent = ate(
13                    configuration,
14                    commonRpcService,
15                    haServices,
16                    blobServer,
17                    heartbeatServices,
18                    metricRegistry,
19                    archivedExecutionGraphStore,
20new MetricQueryServiceRpcService()),
21this);
22
23//............
24        }
25    }
  4)在create()⽅法中,会启动Flink的诸多组件,其中与提交任务强相关的是Dispatcher、ResourceManager,具体代码如下:
1public DispatcherResourceManagerComponent<T> create(
2            Configuration configuration,
3            RpcService rpcService,
4            HighAvailabilityServices highAvailabilityServices,
5            BlobServer blobServer,
6            HeartbeatServices heartbeatServices,
7            MetricRegistry metricRegistry,
8            ArchivedExecutionGraphStore archivedExecutionGraphStore,
9            MetricQueryServiceRetriever metricQueryServiceRetriever,
10            FatalErrorHandler fatalErrorHandler) throws Exception {
11
12        LeaderRetrievalService dispatcherLeaderRetrievalService = null;
13        LeaderRetrievalService resourceManagerRetrievalService = null;
14        WebMonitorEndpoint<U> webMonitorEndpoint = null;
15        ResourceManager<?> resourceManager = null;
16        JobManagerMetricGroup jobManagerMetricGroup = null;
17        T dispatcher = null;

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。