Kafka⽔位(highwatermark)与leaderepoch的讨论
~~~这是⼀篇有点长的⽂章,希望不会令你昏昏欲睡~~~
本⽂主要讨论0.11版本之前Kafka的副本备份机制的设计问题以及0.11是如何解决的。简单来说,0.11之前副本备份机制主要依赖⽔位(或⽔印)的概念,⽽0.11采⽤了leader epoch来标识备份进度。后⾯我们会详细讨论两种机制的差异。不过⾸先先做⼀些基本的名词含义解析。
⽔位或⽔印(watermark)⼀词,也可称为⾼⽔位(high watermark),通常被⽤在流式处理领域(⽐如Apache Flink、Apache Spark等),以表征元素或事件在基于时间层⾯上的进度。⼀个⽐较经典的表述为:流式系统保证在⽔位t时刻,创建时间(event time) = t'且t' ≤ t的所有事件都已经到达或被观测到。在Kafka中,⽔位的概念反⽽与时间⽆关,⽽是与位置信息相关。严格来说,它表⽰的就是位置信息,即位移(offset)。⽹上有⼀些关于Kafka watermark的介绍,本不应再赘述,但鉴于本⽂想要重点强调的leader epoch与watermark息息相关,故这⾥再费些篇幅阐述⼀下watermark。注意:由于Kafka源码中使⽤的名字是⾼⽔位,故本⽂将始终使⽤high watermaker或⼲脆简称为HW。
Kafka分区下有可能有很多个副本(replica)⽤于实现冗余,从⽽进⼀步实现⾼可⽤。副本根据⾓⾊的不同可分为3类:
leader副本:响应clients端读写请求的副本
follower副本:被动地备份leader副本中的数据,不能响应clients端读写请求。
ISR副本:包含了leader副本和所有与leader副本保持同步的follower副本——如何判定是否与leader同步后⾯会提到
每个Kafka副本对象都有两个重要的属性:LEO和HW。注意是所有的副本,⽽不只是leader副本。
LEO:即⽇志末端位移(log end offset),记录了该副本底层⽇志(log)中下⼀条消息的位移值。注意是下⼀条消息!也就是说,如果LEO=10,那么表⽰该副本保存了10条消息,位移值范围是[0, 9]。另外,leader LEO和follower LEO的更新是有区别的。我们后⾯会详细说
HW:即上⾯提到的⽔位值。对于同⼀个副本对象⽽⾔,其HW值不会⼤于LEO值。⼩于等于HW值的所有消息都被认为是“已备份”的
(replicated)。同理,leader副本和follower副本的HW更新是有区别的,我们后⾯详谈。
我们使⽤下图来形象化地说明两者的关系:
上图中,HW值是7,表⽰位移是0~7的所有消息都已经处于“已备份状态”(committed),⽽LEO值是15,那么8~14的消息就是尚未完全备份(fully replicated)——为什么没有15?因为刚才说过了,LEO
指向的是下⼀条消息到来时的位移,故上图使⽤虚线框表⽰。我们总说consumer⽆法消费未提交消息。这句话如果⽤以上名词来解读的话,应该表述为:consumer⽆法消费分区下leader副本中位移值⼤于分区HW的任何消息。这⾥需要特别注意分区HW就是leader副本的HW值。
既然副本分为leader副本和follower副本,⽽每个副本⼜都有HW和LEO,那么它们是怎么被更新的呢?它们更新的机制⼜有什么区别呢?我们⼀⼀来分析下:
⼀、follower副本何时更新LEO?
如前所述,follower副本只是被动地向leader副本请求数据,具体表现为follower副本不停地向leader副本所在的broker发送FETCH请求,⼀旦获取消息后写⼊⾃⼰的⽇志中进⾏备份。那么follower副本的LEO是何时更新的呢?⾸先我必须⾔明,Kafka有两套follower副本LEO(明⽩这个是搞懂后⾯内容的关键,因此请多花⼀点时间来思考):1. ⼀套LEO保存在follower副本所在broker的副本管理机中;2. 另⼀套LEO保存在leader副本所在broker的副本管理机中——换句话说,leader副本机器上保存了所有的follower副本的LEO。
为什么要保存两套?这是因为Kafka使⽤前者帮助follower副本更新其HW值;⽽利⽤后者帮助leader副本更新其HW使⽤。下⾯我们分别看下它们被更新的时机。
1 follower副本端的follower副本LEO何时更新?(原谅我有点拗⼝~~~~~)
follower副本端的LEO值就是其底层⽇志的LEO值,也就是说每当新写⼊⼀条消息,其LEO值就会被更新(类似于LEO += 1)。当follower发送FETCH请求后,leader将数据返回给follower,此时follower开始向底层log写数据,从⽽⾃动地更新LEO值
2 leader副本端的follower副本LEO何时更新?
leader副本端的follower副本LEO的更新发⽣在leader在处理follower FETCH请求时。⼀旦leader接收到follower发送的FETCH请求,它⾸先会从⾃⼰的log中读取相应的数据,但是在给follower返回数据之前它先去更新follower的LEO(即上⾯所说的第⼆套LEO)
⼆、follower副本何时更新HW?
follower更新HW发⽣在其更新LEO之后,⼀旦follower向log写完数据,它会尝试更新它⾃⼰的HW值。具体算法就是⽐较当前LEO值与FETCH响应中leader的HW值,取两者的⼩者作为新的HW值。这告诉我们⼀个事实:如果follower的LEO值超过了leader的HW值,那么follower HW值是不会越过leader HW值的。
三、leader副本何时更新LEO?
和follower更新LEO道理相同,leader写log时就会⾃动地更新它⾃⼰的LEO值。
四、leader副本何时更新HW值?
前⾯说过了,leader的HW值就是分区HW值,因此何时更新这个值是我们最关⼼的,因为它直接影响了分区数据对于consumer的可见性。以下4种情况下leader会尝试去更新分区HW——切记是尝试,有可能因为不满⾜条件⽽不做任何更新:
副本成为leader副本时:当某个副本成为了分区的leader副本,Kafka会尝试去更新分区HW。这是显⽽易见的道理,毕竟分区leader发⽣了变更,这个副本的状态是⼀定要检查的!不过,本⽂讨论的是当系统稳定后且正常⼯作时备份机制可能出现的问题,故这个条件不在我们的讨论之列。
broker出现崩溃导致副本被踢出ISR时:若有broker崩溃则必须查看下是否会波及此分区,因此检查下分区HW值是否需要更新是有必要的。本⽂不对这种情况做深⼊讨论
producer向leader副本写⼊消息时:因为写⼊消息会更新leader的LEO,故有必要再查看下HW值是否也需要修改
leader处理follower FETCH请求时:当leader处理follower的FETCH请求时⾸先会从底层的log读取数据,之后会尝试更新分区HW值
特别注意上⾯4个条件中的最后两个。它揭⽰了⼀个事实——当Kafka broker都正常⼯作时,分区HW值
的更新时机有两个:leader处理PRODUCE请求时和leader处理FETCH请求时。另外,leader是如何更新它的HW值的呢?前⾯说过了,leader broker上保存了⼀套follower副本的LEO以及它⾃⼰的LEO。当尝试确定分区HW时,它会选出所有满⾜条件的副本,⽐较它们的LEO(当然也包括leader⾃⼰的LEO),并选择最⼩的LEO值作为HW值。这⾥的满⾜条件主要是指副本要满⾜以下两个条件之⼀:
处于ISR中
副本LEO落后于leader LEO的时长不⼤于replica.lag.time.max.ms参数值(默认是10s)
乍看上去好像这两个条件说得是⼀回事,毕竟ISR的定义就是第⼆个条件描述的那样。但某些情况下Kafka的确可能出现副本已经“追上”了leader的进度,但却不在ISR中——⽐如某个从failure中恢复的副本。如果Kafka只判断第⼀个条件的话,确定分区HW值时就不会考虑这些未在ISR中的副本,但这些副本已经具备了“⽴刻进⼊ISR”的资格,因此就可能出现分区HW值越过ISR中副本LEO的情况——这肯定是不允许的,因为分区HW实际上就是ISR中所有副本LEO的最⼩值。
好了,理论部分我觉得说的差不多了,下⾯举个实际的例⼦。我们假设有⼀个topic,单分区,副本因⼦是2,即⼀个leader副本和⼀个follower副本。我们看下当producer发送⼀条消息时,broker端的副本到底会发⽣什么事情以及分区HW是如何被更新的。
kafka为什么那么快下图是初始状态,我们稍微解释⼀下:初始时leader和follower的HW和LEO都是0(严格来说源代码会初始化LEO为-1,不过这不影响之后的讨论)。leader中的remote LEO指的就是leader端保存的follower LEO,也被初始化成0。此时,producer没有发送任何消息给leader,⽽follower已经开始不断地给leader发送FETCH请求了,但因为没有数据因此什么都不会发⽣。值得⼀提的是,follower发送过来的FETCH请求因为⽆数据⽽暂时会被寄存到leader端的purgatory中,待500ms(replica.fetch.wait.max.ms参数)超时后会强制完成。倘若在寄存期间producer端发送过来数据,那么会Kafka会⾃动唤醒该FETCH请求,让leader继续处理之。
虽然purgatory不是本⽂的重点,但FETCH请求发送和PRODUCE请求处理的时机会影响我们的讨论。因此后续我们也将分两种情况来讨论分区HW的更新。
第⼀种情况:follower发送FETCH请求在leader处理完PRODUCE请求之后
producer给该topic分区发送了⼀条消息。此时的状态如下图所⽰:
如图所⽰,leader接收到PRODUCE请求主要做两件事情:
1. 把消息写⼊写底层log(同时也就⾃动地更新了leader的LEO)
2. 尝试更新leader HW值(前⾯leader副本何时更新HW值⼀节中的第三个条件触发)。我们已经假设此
时follower尚未发送FETCH请求,那么
leader端保存的remote LEO依然是0,因此leader会⽐较它⾃⼰的LEO值和remote LEO值,发现最⼩值是0,与当前HW值相同,故不会更新分区HW值
所以,PRODUCE请求处理完成后leader端的HW值依然是0,⽽LEO是1,remote LEO是1。假设此时follower发送了FETCH请求(或者说follower早已发送了FETCH请求,只不过在broker的请求队列中排队),那么状态变更如下图所⽰:
本例中当follower发送FETCH请求时,leader端的处理依次是:
1. 读取底层log数据
2. 更新remote LEO = 0(为什么是0?因为此时follower还没有写⼊这条消息。leader如何确认follower还未写⼊呢?这是通过follower发来的
FETCH请求中的fetch offset来确定的)
3. 尝试更新分区HW——此时leader LEO = 1,remote LEO = 0,故分区HW值= min(leader LEO, follower remote LEO) = 0
4. 把数据和当前分区HW值(依然是0)发送给follower副本
⽽follower副本接收到FETCH response后依次执⾏下列操作:
1. 写⼊本地log(同时更新follower LEO)
2. 更新follower HW——⽐较本地LEO和当前leader HW取⼩者,故follower HW = 0
此时,第⼀轮FETCH RPC结束,我们会发现虽然leader和follower都已经在log中保存了这条消息,但分区HW值尚未被更新。实际上,它是在第⼆轮FETCH RPC中被更新的,如下图所⽰:
上图中,follower发来了第⼆轮FETCH请求,leader端接收到后仍然会依次执⾏下列操作:
1. 读取底层log数据
2. 更新remote LEO = 1(这次为什么是1了?因为这轮FETCH RPC携带的fetch offset是1,那么为什么这轮携带的就是1了呢,因为上⼀轮结束后
follower LEO被更新为1了)
3. 尝试更新分区HW——此时leader LEO = 1,remote LEO = 1,故分区HW值= min(leader LEO, follow
er remote LEO) = 1。注意分区HW值此时
被更新了
4. 把数据(实际上没有数据)和当前分区HW值(已更新为1)发送给follower副本
同样地,follower副本接收到FETCH response后依次执⾏下列操作:
1. 写⼊本地log,当然没东西可写,故follower LEO也不会变化,依然是1
2. 更新follower HW——⽐较本地LEO和当前leader LEO取⼩者。由于此时两者都是1,故更新follower HW = 1 (注意:我特意⽤了两种颜⾊来描
述这两步,后续会谈到原因!)
Okay,producer端发送消息后broker端完整的处理流程就讲完了。此时消息已经成功地被复制到leader和follower的log中且分区HW是1,表明consumer能够消费offset = 0的这条消息。下⾯我们来分析下PRODUCE和FETCH请求交互的第⼆种情况。
第⼆种情况:FETCH请求保存在purgatory中PRODUCE请求到来
这种情况实际上和第⼀种情况差不多。前⾯说过了,当leader⽆法⽴即满⾜FECTH返回要求的时候(⽐如没有数据),那么该FETCH请求会被暂存到leader端的purgatory中,待时机成熟时会尝试再次处理它。不过Kafka不会⽆限期地将其缓存着,默认有个超时时间(500ms),⼀旦超时时间已过,则这个请求会被强制完成。不过我们要讨论的场景是在寄存期间,producer发送PRODUCE请求从⽽使之满⾜了条件从⽽被唤醒。此时,leader端处理流程如下:
1. leader写⼊本地log(同时⾃动更新leader LEO)
2. 尝试唤醒在purgatory中寄存的FETCH请求
3. 尝试更新分区HW
⾄于唤醒后的FETCH请求的处理与第⼀种情况完全⼀致,故这⾥不做详细展开了。
以上所有的东西其实就想说明⼀件事情:Kafka使⽤HW值来决定副本备份的进度,⽽HW值的更新通常需要额外⼀轮FETCH RPC才能完成,故⽽这种设计是有问题的。它们可能引起的问题包括:
备份数据丢失
备份数据不⼀致
我们⼀⼀分析下:
⼀、数据丢失
如前所述,使⽤HW值来确定备份进度时其值的更新是在下⼀轮RPC中完成的。现在翻到上⾯使⽤两种不同颜⾊标记的步骤处思考下,如果follower 副本在蓝⾊标记的第⼀步与紫⾊标记的第⼆步之间发⽣崩溃,那么就有可能造成数据的丢失。我们举个例⼦来看下。
上图中有两个副本:A和B。开始状态是A是leader。我们假设producer端plicas设置为1,那么当producer发送两条消息给A后,A写⼊到底层log,此时Kafka会通知producer说这两条消息写⼊成功。
但是在broker端,leader和follower底层的log虽都写⼊了2条消息且分区HW已经被更新到2,但follower HW尚未被更新(也就是上⾯紫⾊颜⾊标记的第⼆步尚未执⾏)。倘若此时副本B所在的broker宕机,那么重启回来后B会⾃动把LEO调整到之前的HW值,故副本B会做⽇志截断(log truncation),将offset = 1的那条消息从log中删除,并调整LEO = 1,此时follower副本底层log中就只有⼀条消息,即offset = 0的消息。
B重启之后需要给A发FETCH请求,但若A所在broker机器在此时宕机,那么Kafka会令B成为新的leader,
⽽当A重启回来后也会执⾏⽇志截断,将HW 调整回1。这样,位移=1的消息就从两个副本的log中被删除,即永远地丢失了。
这个场景丢失数据的前提是在plicas=1时,⼀旦消息被写⼊leader端log即被认为是“已提交”,⽽延迟⼀轮FETCH RPC更新HW值的设计使得follower HW值是异步延迟更新的,倘若在这个过程中leader发⽣变更,那么成为新leader的follower的HW值就有可能是过期的,使得clients端认为是成功提交的消息被删除。
⼆、leader/follower数据离散
除了可能造成的数据丢失以外,这种设计还有⼀个潜在的问题,即造成leader端log和follower端log的数据不⼀致。⽐如leader端保存的记录序列是
r1,r2,r3,r4,r5,....;⽽follower端保存的序列可能是r1,r3,r4,。这也是⾮法的场景,因为顾名思义,follower必须追随leader,完整地备份leader端的数据。
我们依然使⽤⼀张图来说明这种场景是如何发⽣的:
这种情况的初始状态与情况1有⼀些不同的:A依然是leader,A的log写⼊了2条消息,但B的log只写⼊了1条消息。分区HW更新到2,但B的HW还是1,同时producer端的plicas = 1。
这次我们让A和B所在机器同时挂掉,然后假设B先重启回来,因此成为leader,分区HW = 1。假设此时producer发送了第3条消息(绿⾊框表⽰)给B,
于是B的log中offset = 1的消息变成了绿⾊框表⽰的消息,同时分区HW更新到2(A还没有回来,就B⼀个副本,故可以直接更新HW⽽不⽤理会A)之后A重启回来,需要执⾏⽇志截断,但发现此时分区HW=2⽽A之前的HW值也是2,故不做任何调整。此后A和B将以这种状态继续正常⼯作。
显然,这种场景下,A和B底层log中保存在offset = 1的消息是不同的记录,从⽽引发不⼀致的情形出现。
Kafka 0.11.0.0.版本解决⽅案
造成上述两个问题的根本原因在于HW值被⽤于衡量副本备份的成功与否以及在出现failture时作为⽇志截断的依据,但HW值的更新是异步延迟的,特别是需要额外的FETCH请求处理流程才能更新,故这中间发⽣的任何崩溃都可能导致HW值的过期。鉴于这些原因,Kafka 0.11引⼊了leader epoch来取代HW值。Leader端多开辟⼀段内存区域专门保存leader的epoch信息,这样即使出现上⾯的两个场景也能很好地规避这些问题。
所谓leader epoch实际上是⼀对值:(epoch,offset)。epoch表⽰leader的版本号,从0开始,当leade
r变更过1次时epoch就会+1,⽽offset则对应于该epoch版本的leader写⼊第⼀条消息的位移。因此假设有两对值:
(0, 0)
(1, 120)
则表⽰第⼀个leader从位移0开始写⼊消息;共写了120条[0, 119];⽽第⼆个leader版本号是1,从位移120处开始写⼊消息。
leader broker中会保存这样的⼀个缓存,并定期地写⼊到⼀个checkpoint⽂件中。
当leader写底层log时它会尝试更新整个缓存——如果这个leader⾸次写消息,则会在缓存中增加⼀个条⽬;否则就不做更新。⽽每次副本重新成为leader时会查询这部分缓存,获取出对应leader版本的位移,这就不会发⽣数据不⼀致和丢失的情况。
下⾯我们依然使⽤图的⽅式来说明下利⽤leader epoch如何规避上述两种情况
⼀、规避数据丢失
上图左半边已经给出了简要的流程描述,这⾥不详细展开具体的leader epoch实现细节(⽐如OffsetsForLeaderEpochRequest的实现),我们只需要知道每个副本都引⼊了新的状态来保存⾃⼰当leader时开始写⼊的第⼀条消息的offset以及leader版本。这样在恢复的时候完全使⽤这些信息⽽⾮⽔位来判断是否需要截断⽇志。
⼆、规避数据不⼀致
同样的道理,依靠leader epoch的信息可以有效地规避数据不⼀致的问题。
总结
0.11.0.0版本的Kafka通过引⼊leader epoch解决了原先依赖⽔位表⽰副本进度可能造成的数据丢失/数据不⼀致问题。有兴趣的读者可以阅读源代码进⼀步地了解其中的⼯作原理。
源代码位置:kafka.server.epoch.LeaderEpochCache.scala (leader epoch数据结构)、
kafka.server.checkpoints.LeaderEpochCheckpointFile(checkpoint检查点⽂件操作类)还有分布在Log中的CRUD操作。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论