Flink核⼼技术浅析(整理版)
1. Flink简介
Apache Flink是⼀个⾯向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同⼀个Flink流执⾏引擎(streaming dataflow engine),提供⽀持流处理和批处理两种类型应⽤的功能。batch dataSet可以视作data Streaming的⼀种特例。基于流执⾏引擎,Flink 提供了诸多更⾼抽象层的API以便⽤户编写分布式任务:
DataSet API,对静态数据进⾏批处理操作,将静态数据抽象成分布式的数据集,⽤户可以⽅便地使⽤Flink提供的各种操作符对分布式数据集进⾏处理,⽀持Java、Scala和Python。
DataStream API,对数据流进⾏流处理操作,将流式的数据抽象成分布式的数据流,⽤户可以⽅便地对分布式数据流进⾏各种操作,⽀持Java和Scala。
Table API,对结构化数据进⾏查询操作,对结构化数据抽象成关系表,并通过类SQL的DSL对关系表进⾏各种查询操作,⽀持Java和Scala。
Flink ML,Flink的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法。
Gelly,Flink的图计算库,提供了图计算的相关API以及多种图计算算法。
Flink的技术栈如图所⽰:
此外,Flink也可以⽅便和Hadoop⽣态圈中其他项⽬集成,例如Flink可以读取存储在HDFS或HBase中的静态数据,以Kafka作为流式的数据源,直接重⽤MapReduce或Store代码,或是通过YARN申请集资源等。
2. Flink核⼼特点
2.1 统⼀的批处理和流处理系统
在执⾏引擎这⼀层,流处理系统与批处理系统最⼤不同在于节点间的数据传输⽅式。对于⼀个流处理系统,其节点间数据传输的标准模型是:当⼀条数据被处理完成后,序列化到缓存中,然后⽴刻通过⽹络传输到下⼀个节点,由下⼀个节点继续处理。⽽对于⼀个批处理系统,其节点间数据传输的标准模型是:当⼀条数据被处理完成后,序列化到缓存中,并不会⽴刻通过⽹络传输到下⼀个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过⽹络传输到下⼀个节点。这两种数据传输模式是两个极端,对应的是流处理对低延迟的要求和批处理系统对⾼吞吐量的要求。
Flink的执⾏引擎采⽤了⼀种⼗分灵活的⽅式,同时⽀持了上述两种数据传输模型。Flink以固定的缓存块为单位进⾏⽹络数据传输,⽤户可以通过缓存块超时值指定缓存块的传输时机。如果缓存块的超时值为0,则Flink的数据传输⽅式类似上⽂所提到流处理系统的标准模型,此时系统可以获得最低的处理延迟。如果缓存块的超时值为⽆限⼤,则Flink的数据传输⽅式类似上⽂所提到批处理系统标准模型,此时系统可以获得最⾼的吞吐量。同时缓存块的超时值也可以设置为0到⽆限⼤之间的任意值。缓存块的超时阀值越⼩,则Flink流处理执⾏引擎的数据处理延迟越低,但吞吐量也会降低,反之亦然。通过调整缓存块的超时阀值,⽤户可根据需求灵活地权衡系统延迟和吞吐量。
在统⼀的流式执⾏引擎基础上,Flink同时⽀持了流计算和批处理,并对性能(延迟、吞吐量等)有所保障。相对于其他原⽣的流处理与批处理系统,并没有因为统⼀执⾏引擎⽽受到影响,从⽽⼤幅度减
轻了⽤户安装、部署、监控、维护等成本。
2.2 Flink流处理的容错机制
对于⼀个分布式系统来说,单个进程或是节点崩溃导致整个Job失败是经常发⽣的事情,在异常发⽣时不会丢失⽤户数据并能⾃动恢复才是分布式系统必须⽀持的特性之⼀。本节主要介绍Flink流处理系统任务级别的容错机制。
批处理系统⽐较容易实现容错机制,由于⽂件可以重复访问,当个某个任务失败后,重启该任务即可。但是到了流处理系统,由于数据源是⽆限的数据流,从⽽导致⼀个流处理任务执⾏⼏个⽉的情况,将所有数据缓存或是持久化,留待以后重复访问基本上是不可⾏的。Flink基于分布式快照与可部分重发的数据源实现了容错。⽤户可⾃定义对整个Job进⾏快照的时间间隔,当任务失败时,Flink会将整个Job恢复到最近⼀次快照,并从数据源重发快照之后的数据。
Flink的分布式快照实现借鉴了Chandy和Lamport在1985年发表的⼀篇关于分布式快照的论⽂,其实现的主要思想如下:按照⽤户⾃定义的分布式快照间隔时间,Flink会定时在所有数据源中插⼊⼀种特殊的快照标记消息,这些快照标记消息和其他消息⼀样在DAG中流动,但是不会被⽤户定义的业务逻辑所处理,每⼀个快照标记消息都将其所在的数据流分成两部分:本次快照数据和下次快照数据。
图3中Flink包含快照标记消息的消息流
快照标记消息沿着DAG流经各个操作符,当操作符处理到快照标记消息时,会对⾃⼰的状态进⾏快照,并存储起来。当⼀个操作符有多个输⼊的时候,Flink会将先抵达的快照标记消息及其之后的消息缓存起来,当所有的输⼊中对应该快照的快照标记消息全部抵达后,操作符对⾃⼰的状态快照并存储,之后处理所有快照标记消息之后的已缓存消息。操作符对⾃⼰的状态快照并存储可以是异步与增量的操作,并不需要阻塞消息的处理。分布式快照的流程如图4所⽰:java核心技术有哪些
图4 Flink分布式快照流程图
当所有的Data Sink(终点操作符)都收到快照标记信息并对⾃⼰的状态快照和存储后,整个分布式快照就完成了,同时通知数据源释放该快照标记消息之前的所有消息。若之后发⽣节点崩溃等异常情况时,只需要恢复之前存储的分布式快照状态,并从数据源重发该快照以后的消息就可以了。
Exactly-Once是流处理系统需要⽀持的⼀个⾮常重要的特性,它保证每⼀条消息只被流处理系统⼀次,许多流处理任务的业务逻辑都依赖于Exactly-Once特性。相对于At-Least-Once或是At-Most-Once,Exactly-Once特性对流处理系统的要求更为严格,实现也更加困难。Flink基于分布式快照实现了Exactly-Once特性。
相对于其他流处理系统的容错⽅案,Flink基于分布式快照的⽅案在功能和性能⽅⾯都具有很多优点,包括:
低延迟。由于操作符状态的存储可以异步,所以进⾏快照的过程基本上不会阻塞消息的处理,因此不会对消息延迟产⽣负⾯影响。
⾼吞吐量。当操作符状态较少时,对吞吐量基本没有影响。当操作符状态较多时,相对于其他的容错机制,分布式快照的时间间隔是⽤户⾃定义的,所以⽤户可以权衡错误恢复时间和吞吐量要求来调整分布式快照的时间间隔。
与业务逻辑的隔离。Flink的分布式快照机制与⽤户的业务逻辑是完全隔离的,⽤户的业务逻辑不会依赖或是对分布式快照产⽣任何影响。错误恢复代价。分布式快照的时间间隔越短,错误恢复的时间越少,与吞吐量负相关。
2.3 Flink流处理的时间窗⼝
对于流处理系统来说,流⼊的消息不存在上限,所以对于聚合或是连接等操作,流处理系统需要对流⼊的消息进⾏分段,然后基于每⼀段数据进⾏聚合或是连接。消息的分段即称为窗⼝,流处理系统⽀持的窗⼝有很多类型,最常见的就是时间窗⼝,基于时间间隔对消息进⾏分段处理。本节主要介绍Flink流处理系统⽀持的各种时间窗⼝。
对于⽬前⼤部分流处理系统来说,时间窗⼝⼀般是根据Task所在节点的本地时钟进⾏切分,这种⽅式实现起来⽐较容易,不会产⽣阻塞。但是可能⽆法满⾜某些应⽤需求,⽐如:①消息本⾝带有时间戳,⽤户希望按照消息本⾝的时间特性进⾏分段处理;②由于不同节点的时钟可能不同,以及消息在流经各个节点的延迟不同,在某个节点属于同⼀个时间窗⼝处理的消息,流到下⼀个节点时可能被切分到不同的时间窗⼝中,从⽽产⽣不符合预期的结果。
Flink⽀持3种类型的时间窗⼝,分别适⽤于⽤户对时间窗⼝不同类型的要求:
Operator Time。根据Task所在节点的本地时钟来切分的时间窗⼝。
Event Time。消息⾃带时间戳,根据消息的时间戳进⾏处理,确保时间戳在同⼀个时间窗⼝的所有消息⼀定会被正确处理。由于消息可能乱序流⼊Task,所以Task需要缓存当前时间窗⼝消息处理的状态,直到确认属于该时间窗⼝的所有消息都被处理,才可以释放,如果乱序的消息延迟很⾼会影响分布式系统的吞吐量和延迟。
Ingress Time。有时消息本⾝并不带时间戳信息,但⽤户依然希望按照消息⽽不是节点时钟划分时间窗⼝,例如避免上⾯提到的第⼆个问题,此时可以在消息源流⼊Flink流处理系统时⾃动⽣成增量的时间戳赋予消息,之后处理的流程与Event Time相同。Ingress Time可以看成是Event Time的⼀个特例,由于其在消息源处时间戳⼀定是有序的,所以在流处理系统中,相对于Event Time,其乱序的消息延迟不会很⾼,因此对Flink分布式系统的吞吐量和延迟的影响也会更⼩。
2.4 定制的内存管理
Flink项⽬基于Java及Scala等JVM语⾔,JVM本⾝作为⼀个各种类型应⽤的执⾏平台,其对Java对象的管理也是基于通⽤的处理策略,其垃圾回收器通过估算Java对象的⽣命周期对Java对象进⾏有效率的管理。
JVM存在的问题
Java对象开销
相对于C/C++等更加接近底层的语⾔,Java对象的存储密度相对偏低,例如[1],"abcd"这样简单的字符串在UTF-8编码中需要4个字节存储,但采⽤了UTF-16编码存储字符串的Java需要8个字节,同时Java对象还有header等其他额外信息,⼀个4字节字符串对象在Java中需要48字节的空间来存储。对
于⼤部分的⼤数据应⽤,内存都是稀缺资源,更有效率的内存存储,意味着CPU数据访问吐吞量更⾼,以及更少磁盘落地的存在。
对象存储结构引发的cache miss
为了缓解CPU处理速度与内存访问速度的差距,现代CPU数据访问⼀般都会有多级缓存。当从内存加载数据到缓存时,⼀般是以cache line 为单位加载数据,所以当CPU访问的数据如果是在内存中连续存储的话,访问的效率会⾮常⾼。如果CPU要访问的数据不在当前缓存所有的cache line中,则需要从内存中加载对应的数据,这被称为⼀次cache miss。当cache miss⾮常⾼的时候,CPU⼤部分的时间都在等待数据加载,⽽不是真正的处理数据。Java对象并不是连续的存储在内存上,同时很多的Java数据结构的数据聚集性也不好。
⼤数据的垃圾回收
Java的垃圾回收机制⼀直让Java开发者⼜爱⼜恨,⼀⽅⾯它免去了开发者⾃⼰回收资源的步骤,提⾼了开发效率,减少了内存泄漏的可能,另⼀⽅⾯垃圾回收也是Java应⽤的不定时,有时秒级甚⾄是分钟级的垃圾回收极⼤影响了Java应⽤的性能和可⽤性。在时下数据中⼼,⼤容量内存得到了⼴泛的应⽤,甚⾄出现了单台机器配置TB内存的情况,同时,⼤数据分析通常会遍历整个源数据集,对数据进⾏转换、清洗、处理等步骤。在这个过程中,会产⽣海量的Java对象,JVM的垃圾回收执⾏效率
对性能有很⼤影响。通过JVM参数调优提⾼垃圾回收效率需要⽤户对应⽤和分布式计算框架以及JVM的各参数有深⼊了解,⽽且有时候这也远远不够。
OOM问题
OutOfMemoryError是分布式计算框架经常会遇到的问题,当JVM中所有对象⼤⼩超过分配给JVM的内存⼤⼩时,就会出现OutOfMemoryError错误,JVM崩溃,分布式框架的健壮性和性能都会受到影响。通过JVM管理内存,同时试图解决OOM问题的应⽤,通常都需要检查Java对象的⼤⼩,并在某些存储Java对象特别多的数据结构中设置阈值进⾏控制。但是JVM并没有提供官⽅检查Java对象⼤⼩的⼯具,第三⽅的⼯具类库可能⽆法准确通⽤地确定Java对象⼤⼩[6]。侵⼊式的阈值检查也会为分布式计算框架的实现增加很多额外与业务逻辑⽆关的代码。
Flink的处理策略
为了解决以上提到的问题,⾼性能分布式计算框架通常需要以下技术:
定制的序列化⼯具
显式内存管理的前提步骤就是序列化,将Java对象序列化成⼆进制数据存储在内存上(on heap或是off-heap)。通⽤的序列化框架,如Java默认使⽤java.io.Serializable将Java对象及其成员变量的所有
元信息作为其序列化数据的⼀部分,序列化后的数据包含了所有反序列化所需的信息。这在某些场景中⼗分必要,但是对于Flink这样的分布式计算框架来说,这些元数据信息可能是冗余数据。
分布式计算框架可以使⽤定制序列化⼯具的前提是要待处理数据流通常是同⼀类型,由于数据集对象的类型固定,从⽽可以只保存⼀份对象Schema信息,节省⼤量的存储空间。同时,对于固定⼤⼩的类型,也可通过固定的偏移位置存取。在需要访问某个对象成员变量时,通过定制的序列化⼯具,并不需要反序列化整个Java对象,⽽是直接通过偏移量,从⽽只需要反序列化特定的对象成员变量。如果对象的成员变量较多时,能够⼤⼤减少Java对象的创建开销,以及内存数据的拷贝⼤⼩。Flink数据集都⽀持任意Java或是Scala类型,通过⾃动⽣成定制序列化⼯具,既保证了API接⼝对⽤户友好(不⽤像Hadoop那样数据类型需要继承实现org.apache.hadoop.io.Writable接⼝),也达到了和Hadoop类似的序列化效率。
Flink对数据集的类型信息进⾏分析,然后⾃动⽣成定制的序列化⼯具类。Flink⽀持任意的Java或是Scala类型,通过Java Reflection框架分析基于Java的Flink程序UDF(User Define Function)的返回类型的类型信息,通过Scala Compiler分析基于Scala的Flink程序UDF 的返回类型的类型信息。类型信息由TypeInformation类表⽰,这个类有诸多具体实现类,例如:
BasicTypeInfo任意Java基本类型(装包或未装包)和String类型。
BasicArrayTypeInfo任意Java基本类型数组(装包或未装包)和String数组。
WritableTypeInfo任意Hadoop的Writable接⼝的实现类。
TupleTypeInfo任意的Flink tuple类型(⽀持Tuple1 to Tuple25)。 Flink tuples是固定长度固定类型的Java Tuple实现。
CaseClassTypeInfo任意的 Scala CaseClass(包括 Scala tuples)。
PojoTypeInfo任意的POJO (Java or Scala),例如Java对象的所有成员变量,要么是public修饰符定义,要么有getter/setter⽅法。
GenericTypeInfo任意⽆法匹配之前⼏种类型的类。
前6种类型数据集⼏乎覆盖了绝⼤部分的Flink程序,针对前6种类型数据集,Flink皆可以⾃动⽣成对应的TypeSerializer定制序列化⼯具,⾮常有效率地对数据集进⾏序列化和反序列化。对于第7种类型,Flink使⽤Kryo进⾏序列化和反序列化。此外,对于可被⽤作Key的类型,Flink还同时⾃动⽣成TypeComparator,⽤来辅助直接对序列化后的⼆进制数据直接进⾏compare、hash等操作。对于Tuple、CaseClass、Pojo等组合类型,Flink⾃动⽣成的TypeSerializer、TypeComparator同样是组合的,并把其成员的序列化/反序列化代理给其成员对应的TypeSerializer、TypeComparator,如图6所⽰:
图6组合类型序列化
此外如有需要,⽤户可通过集成TypeInformation接⼝定制实现⾃⼰的序列化⼯具。
显式的内存管理
⼀般通⽤的做法是批量申请和释放内存,每个JVM实例有⼀个统⼀的内存管理器,所有内存的申请和释放都通过该内存管理器进⾏。这可以避免常见的内存碎⽚问题,同时由于数据以⼆进制的⽅式存储,可以⼤⼤减轻垃圾回收压⼒。
垃圾回收是JVM内存管理回避不了的问题,JDK8的G1算法改善了JVM垃圾回收的效率和可⽤范围,但对于⼤数据处理实际环境还远远不够。这也和现在分布式框架的发展趋势有所冲突,越来越多的分布式计算框架希望尽可能多地将待处理数据集放⼊内存,⽽对于JVM垃圾回收来说,内存中Java对象越少、存活时间越短,其效率越⾼。通过JVM进⾏内存管理的话,OutOfMemoryError也是⼀个很难解决的问题。同时,在JVM内存管理中,Java对象有潜在的碎⽚化存储问题(Java对象所有信息可能在内存中连续存储),也有可能在所有Java对象⼤⼩没有超过JVM分配内存时,出现OutOfMemoryError问题。Flink将内存分为3个部分,每个部分都有不同⽤途:
Network buffers: ⼀些以32KB Byte数组为单位的buffer,主要被⽹络模块⽤于数据的⽹络传输。
Memory Manager pool⼤量以32KB Byte数组为单位的内存池,所有的运⾏时算法(例如Sort/Shuffle/Join)都从这个内存池申请内存,并将序列化后的数据存储其中,结束后释放回内存池。
Remaining (Free) Heap主要留给UDF中⽤户⾃⼰创建的Java对象,由JVM管理。
Network buffers在Flink中主要基于Netty的⽹络传输,⽆需多讲。Remaining Heap⽤于UDF中⽤户⾃⼰创建的Java对象,在UDF中,⽤户通常是流式的处理数据,并不需要很多内存,同时Flink也不⿎励⽤户在UDF中缓存很多数据,因为这会引起前⾯提到的诸多问题。Memory Manager pool(以后以内存池代指)通常会配置为最⼤的⼀块内存,接下来会详细介绍。
在Flink中,内存池由多个MemorySegment组成,每个MemorySegment代表⼀块连续的内存,底层存储是byte[],默认32KB⼤⼩。MemorySegment提供了根据偏移量访问数据的各种⽅法,如get/put int、long、float、double等,MemorySegment之间数据拷贝等⽅法和java.nio.ByteBuffer类似。对于Flink的数据结构,通常包括多个向内存池申请的MemeorySegment,所有要存⼊的对象通过TypeSerializer序列化之后,将⼆进制数据存储在MemorySegment中,在取出时通过TypeSerializer反序列化。数据结构通过MemorySegment提供的set/get⽅法访问具体的⼆进制数据。Flink这种看起来⽐较复杂的内存管理⽅式带来的好处主要有:
⼆进制的数据存储⼤⼤提⾼了数据存储密度,节省了存储空间。
所有的运⾏时数据结构和算法只能通过内存池申请内存,保证了其使⽤的内存⼤⼩是固定的,不会因为运⾏时数据结构和算法⽽发⽣OOM。对于⼤部分的分布式计算框架来说,这部分由于要缓存⼤量数据最有可能导致OOM。
内存池虽然占据了⼤部分内存,但其中的MemorySegment容量较⼤(默认32KB),所以内存池中的Java对象其实很少,⽽且⼀直被内存池引⽤,所有在垃圾回收时很快进⼊持久代,⼤⼤减轻了JVM垃圾回收的压⼒。
Remaining Heap的内存虽然由JVM管理,但是由于其主要⽤来存储⽤户处理的流式数据,⽣命周期⾮
常短,速度很快的Minor GC就会全部回收掉,⼀般不会触发Full GC。
Flink当前的内存管理在最底层是基于byte[],所以数据最终还是on-heap,最近Flink增加了off-heap的内存管理⽀持。Flink off-heap的内存管理相对于on-heap的优点主要在于:
启动分配了⼤内存(例如100G)的JVM很耗费时间,垃圾回收也很慢。如果采⽤off-heap,剩下的Network buffer和Remaining heap都会很⼩,垃圾回收也不⽤考虑MemorySegment中的Java对象了。
更有效率的IO操作。在off-heap下,将MemorySegment写到磁盘或是⽹络可以⽀持zeor-copy技术,⽽on-heap的话则⾄少需要⼀次内存拷贝。
off-heap可⽤于错误恢复,⽐如JVM崩溃,在on-heap时数据也随之丢失,但在off-heap下,off-heap的数据可能还在。此外,off-heap上的数据还可以和其他程序共享。
缓存友好的计算
对于计算密集的数据结构和算法,直接操作序列化后的⼆进制数据,⽽不是将对象反序列化后再进⾏操作。同时,只将操作相关的数据连续存储,可以最⼤化的利⽤L1/L2/L3缓存,减少Cache miss的概率,提升CPU计算的吞吐量。以排序为例,由于排序的主要操作是对Key 进⾏对⽐,如果将所有排序数据的Key与Value分开并对Key连续存储,那么访问Key时的Cache命中率会⼤⼤提⾼。
磁盘IO和⽹络IO之前⼀直被认为是Hadoop系统的瓶颈,但是随着Spark、Flink等新⼀代分布式计算框架的发展,越来越多的趋势使得CPU/Memory逐渐成为瓶颈,这些趋势包括:
更先进的IO硬件逐渐普及。10GB⽹络和SSD硬盘等已经被越来越多的数据中⼼使⽤。
更⾼效的存储格式。Parquet,ORC等列式存储被越来越多的Hadoop项⽬⽀持,其⾮常⾼效的压缩性能⼤⼤减少了落地存储的数据
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论