flink中级篇-DAG图的剖析
⼀.DAG图的总体架构:
Flink框架引擎把执⾏计划抽象为四个层次的数据结构,分别是API层、静态topology、JobGraph、ExecutionGraph等。
具体的结构⼊下图:
⼆. 流作业DAG图分析
API层
⽤户通过StreamApI或是BatchAPI描述Job的逻辑,并通过调⽤链把作业逻辑操作串联起来,对于⽤户⽽⾔,业务抽象的是⼀个⼀个的有特定功能的Operator(操作算⼦),通过Operator的组合来来描述⼀个作业的对数据的处理过程。
对于流作业:
⽤户程序通过调⽤ute触发StreamGraphGenerator⽣成StreamGraph。
.对于批作业:
⽤户程序通过ute触发Optimizer⽣成OptimizerPlan
静态Graph层
StreamGraph
⾸先StreamGraph的核⼼是⼀个StreamNode的拓扑图,StreamNode之间是通过StreamEdge连接。
该拓扑图是通过翻译transformations的链表⽽来,⽤户的userFunction最终被flink框架封装成Stream
Transformation的调⽤链,然后再翻译成StreamGraph。普通的Operator会通过图⽣成器( StreamGraphGenerator)的transform⽅法来⽣成对于的StreamNode,⽽实现分流功能的Select操作以及实现分区的分区选择器则被翻译成虚拟节点并加⼊到StreamGraph中。
下⾯是的transforms转换成StreamNode的核⼼代码:
Collection<Integer> transformedIds;
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
从上⾯的代码中可以看到框架根据Transform的类型调⽤不同transformX函数进⾏转换。
1)其中OneInput、TwoInput、Source、sink的Transform⼤体相同就是封装StreamNode然后加⼊到Graph中,所不同的是对Edge的处理不同。
2)split和select的Transform成对使⽤,⼀起配合完成分流的功能。transformsSplit会根据OutputSelector()来addOutputSelector,⽽tarnsformsSelect则会根据selectdNames⽣产相应
的VirtualNode
3) transformPartition和transformSideOutput则是⽣成VirtualNode并分别把OutputTag、StreamPartitioner加⼊对应的列表中。
StreamNode:
StreamNode的核⼼组件是:operator、inEdges、outEdges
(1)Operator:
Operator则是StreamOperator的实例,封装了UserDefineFunction和算⼦本⾝state,以及提供管理⾃定义state 、Channel的接⼝。
(2)inEdges和outEdges:
inEdges和outEdges是StreamEdge的实列⽤于把Connector各个StreamNode,构成整个执⾏拓扑图。
(3)StreamEdge:
StreamEdge包括source和target 分别⽤于指向源和⽬标的StreamNode,此外SteamEdge还包括select的Names列表、分区选择器outputPartitioner,以及outputTag。
JobGraph层
什么是jobGraph?
jobGraph是唯⼀能被Flink引擎所识别的表述作业的数据结构,也正是这⼀共同的抽象为Flink批量统⼀奠定了基础。
相⽐于StreamGraph和OptimizedPlan这些静态的拓扑图JobGraph加⼊了中间结果集(IntermediateDataSet)使其赋予了动态的特性,就像流⽔⼀样源源不断流淌。
transform中文翻译JobGraph的组成:
JobGraph是由顶点(JobVertex)、作业边(JobEdge)以及中间结果集(IntermediateDataSet)三个基本的元素组成。其中JobEdge和IntermediateDateSet都是多个Vertex之间的连接的纽带,分别代表顶点的input和result。
(1).⼀个JobVertex关联着若⼲个JobEdge作为输⼊端以及若⼲个IntermediateDataSet作为其产⽣的结果集合。
(2) ⼀个JobEdge关联着⼀个IntermeidateDataSet作为其数据源和⼀个JobVertex作为其消费者
(3) ⼀个IntermediateDataSet关联着⼀个Vertex作为其⽣产者,和若⼲的JobEdge作为其消费者
NOTE:这就如同⼀个抽⽔系统,Vertex是抽⽔泵,JobEdge是⽔管,ItermediateDateSet就是中间的蓄⽔池。
JobVertex
JobVertex是JobGraph中最重要的元素之⼀。实际上JobGraph只是维护了⼀个JobVertex类型的Map数据结构,JobEdge和中间结果集IntermediateDataSet都Vertex的成员变量,通过各个顶点Vertex之间的关系形成⼀个完整的动态Topology图。JobVertex的组成是:id,inputList,results等。。。
流作业的JobGraph⽣成过程:
流作业中JobGraph的⽣成是由StreamingJobGraphGenerator的createJobGraph完成的,该函数的调⽤⼊⼝是StramGraph的getJobGraph。
jobGraph的⽣成过程⼤体分为三个阶段:
第⼀阶段:初始化阶段:
主要是实列化JobGraph对象并初始化⼀些实列变量。然后通过StreamGraphHasher⽣产⼀批hash值⽤于标识jobGraph中的顶点JobVertex,如果DAG图没有变化这批hash值多次⽣成的结果是不变的,这也为后续failover Recover提供了可⾏性。
Hash的⽣成是通过visitor以⼴度的⽅式遍历StreamGrah的StreamNode来实现。
Hash的计算⽅式:
1.如果⽤户指定了TransformationUID则使⽤该UID⽣产对应的hash值
2.否则根据Job中的综合属性计算。
对于第⼆种⽅式的计算因素包括下⾯三种:
(1) StreamNode中的序列化序号
(2) Chain在⼀起的输出Node的序号
(3) 输⼊Node的hash值
因此改变上⾯中任务⼀个属性都会造成JobVertexID的变化
第⼆阶段:创建并链接 JobVertex
为了更加⾼效的执⾏,Flink对DAG图进⾏在调度上做⾥⼀定的优化,即算⼦链接(operator chain),也就是把StreamGraph中的若⼲个StreamNode chain在⼀起共同组成Flink中的调度任务Task,在Job的执⾏时候,这些调度Task会按照并发度分割成多个subTask分布到执⾏线程中执⾏。在旧版本中每个subTask分配⼀个slot执⾏,在之后加⼊了sharingSlot⽅式,⼀个slot会执⾏多个subTask任务。
优化的的⽬的:
1).减少Job调度的Task数量,从⽽减少线程之间的切换的开销提⾼CPU的利⽤率。
2).减少job中不同node之间数据交换以及不同JVM之间的⽹络开销提⾼数据交换的速度
最终达到减少延时的同时提⾼了系统的吞吐。
Chain的实现:
JobVertex的chain通过setChaining的调⽤触发,遍历所有的源头StreamNode逐条Chain的调⽤createChain完成链路的设置,每个source到sink的StreamNode组成⼀条单独的chain。
对于单条chain的Connection,则是通过遍历StreamNodes图采⽤递归的调⽤的⽅式实现,最后从sink节点开始开始往前做下⾯操作: 1) 设置当前节点的ChainName,Min和preferred Resources
2) 如果是当前节点⾮Vertex的header节点则单纯的创建StreamConfig对象并设置相关的属性,其属性包括ID 、
In/OutTypeSerializer、Operator、Chain/nonChain Outputs、CheckPointConfig、StateBackend等。。。
3) 如果当前Node是Vertex的header,则在⽣产和设置StreamConfig之前会创建JobVertex对象初始化后加⼊到JobStream中,最后通过调⽤connect函数,创建作业 边(JobEdge)和中间结果集(IntermediateDataSet)把其他的JobVertex关联起来。
Chain的算法:
如果要把⼀个StreamNode的OutputEdge链接到⼀个Vertex中需要同时满⾜下⾯的条件:
1. 当前节点的OutputEdge的上、下游节点的Operator不能为空
2. 当前Node的下游节点的输⼊边除了当前的出边外没有其他⼊边(即是该节点的inEdges的长度是1)。
3. 上下游节点是在同⼀个SharingGroup组中或为空
4. 上下游节点的并发度要相同
5. 下游Operator的ChainingStrategy是ALWAYS,⽽上游(当前节点)的Operator的ChainingStrategy是HEAD或ALWAYS。 6. 本条JobEdge的Partitioner是Forward。
7. 作业的配置允许Chaining
第三个阶段:设置JobGraph中其他属性
(1).设置JobGraph的物理边
所谓的物理边其实就是真正⽤于链接不同JobVertex之间的Edge,也就是前⾯阐述的JobEdge。在这⾥调⽤setPhysicalEdges只是单纯的把第⼆阶段调Connect函数时的产⽣的JobEdge所对应的StreamEdge组织成Map数据结构并把引⽤保存到StreamingJobGraphGenerator,JobEdge的⽣成和结果集IntermediaDateSet都是在第⼆阶段完成。
(2)设置槽共享组(SlotSharingGroup)以及同位组(CoLocationGroup)。其中同位组主要⽤于迭代算⼦的执⾏。
(3)设置Job的Checkpoint配置。
ExecutionGraph层
ExecutionGraph是作业在调度阶段的数据结构,它是JobGraph并⾏化之后的视图是调度层最核⼼的数据结构。
ExecutionGraph的核⼼组:
类似的ExecutionGraph同样是由ExecutionJobVertex、IntermediateResult、ExecutionEdge等三个核⼼组件构成,分别对应着JobGraph中的JobVertex、IntermediateDataSet以及JobEdge。和JobGraph有所不同的是ExecutionGraph根据并发度进⼀步把ExecutionGraph划分为等同于并发度个数量的多个ExecutionVertex、IntermediateResultPartition及ExecutionEdge。
转换⽰例图如下:
ExecutionGraph的⽣成
ExecutionGraph的⽣成器是ExecutionGraphBuilder,在Client提交job之后,JobManager会通过⽣成JobMaster对象的同时触发ExecutionGraphBuilder的buildGraph函数进⽽⽣成ExecutionGraph。
⽣成的过程很简单就是根据遍历JobGraph中的JobVertex根据其属性设置ExecutionGraph中的属性信息,然后根据parallelism⽣成相应的ExecutionVertex。
⾄此FLink流作业的执⾏计划基本完成,最后就是执⾏计划的物理调度了这⾥就不在赘述。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论