kafka最新版本
编写你的第⼀个Java版Raft分布式KV存储
前⾔
本⽂旨在讲述如何使⽤ Java 语⾔实现基于 Raft 算法的,分布式的,KV 结构的存储项⽬。该项⽬的背景是为了深⼊理解 Raft 算法,从⽽深刻理解分布式环境下数据强⼀致性该如何实现;该项⽬的⽬标是:在复杂的分布式环境中,多个存储节点能够保证数据强⼀致性。
欢迎 star :)
什么是 Java 版 Raft 分布式 KV 存储
Raft 算法⼤部分⼈都已经了解,也有很多实现,从 GitHub 上来看,似乎 Golang 语⾔实现的较多,⽐较有名的,例如 etcd。⽽ Java 版本的,在⽣产环境⼤规模使⽤的实现则较少;
同时,他们的设计⽬标⼤部分都是命名服务,即服务注册发现,也就是说,他们通常都是基于 AP 实现,就像 DNS,DNS 是⼀个命名服务,同时也不是⼀个强⼀致性的服务。
⽐较不同的是 Zookeeper,ZK 常被⼤家⽤来做命名服务,但他更多的是⼀个分布式服务协调者。
⽽上⾯的这些都不是存储服务,虽然也都可以做⼀些存储⼯作。甚⾄像 kafka,可以利⽤ ZK 实现分布式存储。
回到我们这边。
此次我们语⾔部分使⽤ Java,RPC ⽹络通信框架使⽤的是蚂蚁⾦服 SOFA-Bolt,底层 KV 存储使⽤的是 RocksDB,其中核⼼的 Raft 则由我们⾃⼰实现(如果不⾃⼰实现,那这个项⽬没有意义)。注意,该项⽬将舍弃⼀部分性能和可⽤性,以追求尽可能的强⼀致性。
为什么要费尽⼼⼒重复造轮⼦
⼩时候,我们阅读关于⾼可⽤的⽂章时,最后都会提到⼀个问题:服务挂了怎么办?
通常有 2 种回答:
1. 如果是⽆状态服务,那么毫不影响使⽤。
2. 如果是有状态服务,可以将状态保存到⼀个别的地⽅,例如 Redis。如果 Redis 挂了怎么办?那就放到 ZK。
很多中间件,都会使⽤ ZK 来保证状态⼀致,例如 codis,kafka。因为使⽤ ZK 能够帮我们节省⼤量的时间。但有的时候,中间件的⽤户觉得引⼊第三⽅中间件很⿇烦,那么中间件开发者会尝试⾃⼰实现⼀致性,例如 Redis Cluster, TiDB 等。
⽽通常⾃⼰实现,都会使⽤ Raft 算法,那有⼈问,为什么不使⽤"更⽜逼的" paxos 算法?对不起,这个有点难,⾄少⽬前开源的、⽣产环境⼤规模使⽤的 paxos 算法实现还没有出现,只听过 Google 或者 alibaba 在其内部实现过,具体是什么样⼦的,这⾥我们就不讨论了。
回到我们的话题,为什么重复造轮⼦?从 3 个⽅⾯来回答:
1. 有的时候 ZK 和 etcd 并不能解决我们的问题,或者像上⾯说的,引⼊其他的中间件部署起来太⿇烦也太重。
2. 完全处于好奇,好奇为什么 Raft 可以保证⼀致性(这通常可以通过汗⽜充栋的⽂章来得到解答)?但是到底该怎么实现?
3. 分布式开发的要求,作为开发分布式系统的程序员,如果能够更深刻的理解分布式系统的核⼼算法,那么对如何合理设计⼀个分布式
系统将⼤有益处。
好,有了以上 3 个原因,我们就有⾜够的动⼒来造轮⼦了,接下来就是如何造的问题了。
编写前的 Raft 理论基础
任何实践都是理论先⾏。如果你对 Raft 理论已经⾮常熟悉,那么可以跳过此节,直接看实现的步骤。
Raft 为了算法的可理解性,将算法分成了 4 个部分。
1. leader 选举
2. ⽇志复制
3. 成员变更
4. ⽇志压缩
同 zk ⼀样,leader 都是必须的,所有的写操作都是由 leader 发起,从⽽保证数据流向⾜够简单。⽽ leader 的选举则通过⽐较每个节点的逻辑时间(term)⼤⼩,以及⽇志下标(index)的⼤⼩。
刚刚说 leader 选举涉及⽇志下标,那么就要讲⽇志复制。⽇志复制可以说是 Raft 核⼼的核⼼,说简单点,Raft 就是为了保证多节点之间⽇志的⼀致。当⽇志⼀致,我们可以认为整个系统的状态是⼀致的。
这个⽇志你可以理解成 mysql 的 binlog。
Raft 通过各种补丁,保证了⽇志复制的正确性。
Raft leader 节点会将客户端的请求都封装成⽇志,发送到各个 follower 中,如果集中超过⼀半的 follower 回复成功,那么这个⽇志就可以
被提交(commit),这个 commit 可以理解为 ACID 的 D ,即持久化。当⽇志被持久化到磁盘,后⾯的事情就好办了。
⽽第三点则是为了节点的扩展性。第四点是为了性能。相⽐较 leader 选举和⽇志复制,不是那么的重要,可以说,如果没有成员变更和⽇志压缩,也可以搞出⼀个可⽤的 Raft 分布式系统,但没有 leader 选举和⽇志复制,是万万不能的。
因此,本⽂和本项⽬将重点放在 leader 选举和⽇志复制。
以上,就简单说明了 Raft 的算法,关于 Raft 算法更多的⽂章,请参考本⼈博客中的其他⽂章(包含官⽅各个版本论⽂和 PPT & 动画 & 其他博客⽂章),博客地址:thinkinjava
实现的步骤
实现⽬标:基于 Raft 论⽂实现 Raft 核⼼功能,即 Leader 选举 & ⽇志复制。
Raft 核⼼组件包括:⼀致性模块,RPC 通信,⽇志模块,状态机。
技术选型:
⼀致性模块,是 Raft 算法的核⼼实现,通过⼀致性模块,保证 Raft 集节点数据的⼀致性。这⾥我们需要⾃⼰根据论⽂描述去实现。
RPC 通信,可以使⽤ HTTP 短连接,也可以直接使⽤ TCP 长连接,考虑到集各个节点频繁通信,同时节点通常都在⼀个局域⽹内,因此我们选⽤ TCP 长连接。⽽ Java 社区长连接框架⾸选 Netty,这⾥我们选⽤蚂蚁⾦服⽹络通信框架 SOFA-Bolt(基于
Netty),便于快速开发。
⽇志模块,Raft 算法中,⽇志实现是基础,考虑到时间因素,我们选⽤ RocksDB 作为⽇志存储。
状态机,可以是任何实现,其实质就是将⽇志中的内容进⾏处理。可以理解为 Mysql binlog 中的具体数据。由于我们是要实现⼀个 KV 存储,那么可以直接使⽤⽇志模块的 RocksDB 组件。
以上。我们可以看到,得益于开源世界,我们开发⼀个 Raft 存储,只需要编写⼀个“⼀致性模块”就⾏了,其他模块都有现成的轮⼦可以使⽤,真是美滋滋。
接⼝设计:
上⾯我们说了 Raft 的⼏个核⼼功能,事实上,就可以理解为接⼝。所以我们定义以下⼏个接⼝:
1. Consensus,⼀致性模块接⼝
2. LogModule,⽇志模块接⼝
3. StateMachine,状态机接⼝
4. RpcServer & RpcClient, RPC 接⼝
5. Node,同时,为了聚合上⾯的⼏个接⼝,我们需要定义⼀个 Node 接⼝,即节点,Raft 抽象的机器节点。
6. LifeCycle,最后,我们需要管理以上组件的⽣命周期,因此需要⼀个 LifeCycle 接⼝。
接下来,我们需要详细定义核⼼接⼝ Consensus。我们根据论⽂定义了 2 个核⼼接⼝:
/**
* 请求投票 RPC
*
* 接收者实现:
*
*      如果term < currentTerm返回 false (5.2 节)
*      如果 votedFor 为空或者就是 candidateId,并且候选⼈的⽇志⾄少和⾃⼰⼀样新,那么就投票给他(5.2 节,5.4 节)
*/
RvoteResult requestVote(RvoteParam param);
/**
* 附加⽇志(多个⽇志,为了提⾼效率) RPC
*
* 接收者实现:
*
*    如果 term < currentTerm 就返回 false (5.1 节)
*    如果⽇志在 prevLogIndex 位置处的⽇志条⽬的任期号和 prevLogTerm 不匹配,则返回 false (5.3 节)
*    如果已经存在的⽇志条⽬和新的产⽣冲突(索引值相同但是任期号不同),删除这⼀条和之后所有的(5.3 节)
*    附加任何在已有的⽇志中不存在的条⽬
*    如果 leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和新⽇志条⽬索引值中较⼩的⼀个
*/
AentryResult appendEntries(AentryParam param);
请求投票 & 附加⽇志。也就是我们的 Raft 节点的核⼼功能,leader 选举和⽇志复制。实现这两个接⼝是 Raft 的关键所在。
然后再看 LogModule 接⼝,这个⾃由发挥,考虑⽇志的特点,我定义了以下⼏个接⼝:
void write(LogEntry logEntry);
LogEntry read(Long index);
void removeOnStartIndex(Long startIndex);
LogEntry getLast();
Long getLastIndex();
分别是写,读,删,最后是两个关于 Last 的接⼝,在 Raft 中,Last 是⼀个⾮常关键的东西,因此我这⾥单独定义了 2个⽅法,虽然看起来不是很好看:)
状态机接⼝,在 Raft 论⽂中,将数据保存到状态机,作者称之为应⽤,那么我们也这么命名,说⽩了,
就是将已成功提交的⽇志应⽤到状态机中:
/**
* 将数据应⽤到状态机.
*
* 原则上,只需这⼀个⽅法(apply). 其他的⽅法是为了更⽅便的使⽤状态机.
* @param logEntry ⽇志中的数据.
*/
void apply(LogEntry logEntry);
LogEntry get(String key);
String getString(String key);
void setString(String key, String value);
void key);
第⼀个 apply ⽅法,就是 Raft 论⽂常常提及的⽅法,即将⽇志应⽤到状态机中,后⾯的⼏个⽅法,都是我为了⽅便获取数据设计的,可以不⽤在意,甚⾄于,这⼏个⽅法不存在也不影响 Raft 的实现,但影响 KV 存储的实现,试想:⼀个系统只有保存功能,没有获取功能,要你何⽤?。
RpcClient 和 RPCServer 没什么好讲的,其实就是 send 和 receive。
然后是 Node 接⼝,Node 接⼝也是 Raft 没有定义的,我们依靠⾃⼰的理解定义了⼏个接⼝:
/**
* 设置配置⽂件.
*
* @param config
*/
void setConfig(NodeConfig config);
/**
* 处理请求投票 RPC.
*
* @param param
* @return
*/
RvoteResult handlerRequestVote(RvoteParam param);
/**
* 处理附加⽇志请求.
*
* @param param
* @return
*/
AentryResult handlerAppendEntries(AentryParam param);
/**
* 处理客户端请求.
*
* @param request
* @return
*/
ClientKVAck handlerClientRequest(ClientKVReq request);
/**
* 转发给 leader 节点.
* @param request
* @return
*/
ClientKVAck redirect(ClientKVReq request);
⾸先,⼀个 Node 肯定需要配置⽂件,所以有⼀个 setConfig 接⼝,
然后,肯定需要处理“请求投票”和“附加⽇志”,同时,还需要接收⽤户,也就是客户端的请求(不然数据从哪来?),所以有handlerClientRequest 接⼝,最后,考虑到灵活性,我们让每个节点都可以接收客户端的请求,但 follower 节点并不能处理请求,所以需要重定向到 leader 节点,因此,我们需要⼀个重定向接⼝。
最后是⽣命周期接⼝,这⾥我们简单定义了 2 个,有需要的话,再另外加上组合接⼝:
void init() throws Throwable;
void destroy() throws Throwable;
好,基本的接⼝定义完了,后⾯就是实现了。实现才是关键。
Leader 选举的实现
选举,其实就是⼀个定时器,根据 Raft 论⽂描述,如果超时了就需要重新选举,我们使⽤ Java 的定时任务线程池进⾏实现,实现之前,需要确定⼏个点:
1. 选举者必须不是 leader。
2. 必须超时了才能选举,具体超时时间根据你的设计⽽定,注意,每个节点的超时时间不能相同,应当使⽤随机算法错开(Raft 关键实
现),避免⽆谓的死锁。
3. 选举者优先选举⾃⼰,将⾃⼰变成 candidate。
4. 选举的第⼀步就是把⾃⼰的 term 加⼀。
5. 然后像其他节点发送请求投票 RPC,请求参数参照论⽂,包括⾃⾝的 term,⾃⾝的 lastIndex,以及
⽇志的 lastTerm。同时,请求投
票 RPC 应该是并⾏请求的。
6. 等待投票结果应该有超时控制,如果超时了,就不等待了。
7. 最后,如果有超过半数的响应为 success,那么就需要⽴即变成 leader ,并发送⼼跳阻⽌其他选举。
8. 如果失败了,就需要重新选举。注意,这个期间,如果有其他节点发送⼼跳,也需要⽴刻变成 follower,否则,将死循环。
上⾯说的,其实是 Leader 选举中,请求者的实现,那么接收者如何实现呢?接收者在收到“请求投票” RPC 后,需要做以下事情:
1. 注意,选举操作应该是串⾏的,因为涉及到状态修改,并发操作将导致数据错乱。也就是说,如果抢锁失败,应当⽴即返回错误。
2. ⾸先判断对⽅的 term 是否⼩于⾃⼰,如果⼩于⾃⼰,直接返回失败。
3. 如果当前节点没有投票给任何⼈,或者投的正好是对⽅,那么就可以⽐较⽇志的⼤⼩,反之,返回失败。
4. 如果对⽅⽇志没有⾃⼰⼤,返回失败。反之,投票给对⽅,并变成 follower。变成 follower 的同时,异步的选举任务在最后从
condidate 变成 leader 之前,会判断是否是 follower,如果是 follower,就放弃成为 leader。这是⼀个兜底的措施。
到这⾥,基本就能够实现 Raft Leader 选举的逻辑。
注意,我们上⾯涉及到的 LastIndex 等参数,还没有实现,但不影响我们编写伪代码,毕竟⽇志复制⽐ leader 选举要复杂的多,我们的原则是从易到难。:)
⽇志复制的实现
⽇志复制是 Raft 实现⼀致性的核⼼。
⽇志复制有 2 种形式,1种是⼼跳,⼀种是真正的⽇志,⼼跳的⽇志内容是空的,其他部分基本相同,也就是说,接收⽅在收到⽇志时,如果发现是空的,那么他就是⼼跳。
⼼跳
既然是⼼跳,肯定就是个定时任务,和选举⼀样。在我们的实现中,我们每 5 秒发送⼀次⼼跳。注意点:
1. ⾸先⾃⼰必须是 leader 才能发送⼼跳。
2. 必须满⾜ 5 秒的时间间隔。
3. 并发的向其他 follower 节点发送⼼跳。
4. ⼼跳参数包括⾃⾝的 ID,⾃⾝的 term,以便让对⽅检查 term,防⽌⽹络分区导致的脑裂。
5. 如果任意 follower 的返回值的 term ⼤于⾃⾝,说明⾃⼰分区了,那么需要变成 follower,并更新⾃⼰的 term。然后重新发起选举。
然后是⼼跳接收者的实现,这个就⽐较简单了,接收者需要做⼏件事情:
1. ⽆论成功失败⾸先设置返回值,也就是将⾃⼰的 term 返回给 leader。
2. 判断对⽅的 term 是否⼤于⾃⾝,如果⼤于⾃⾝,变成 follower,防⽌异步的选举任务误操作。同时更新选举时间和⼼跳时间。
3. 如果对⽅ term ⼩于⾃⾝,返回失败。不更新选举时间和⼼跳时间。以便触发选举。
说完了⼼跳,再说说真正的⽇志附加。
简单来说,当⽤户向 Leader 发送⼀个 KV 数据,那么 Leader 需要将 KV数据封装成⽇志,并⾏的发送到其他的 follower 节点,只要在指定的超时时间内,有过半⼏点返回成功,那么久提交(持久化)这条⽇志,返回客户端成功,否者返回失败。
因此,Leader 节点会有⼀个 ClientKVAck handlerClientRequest(ClientKVReq request) 接⼝,⽤于接收⽤户的 KV 数据,同时,会并⾏向其他节点复制数据,具体步骤如下:
1. 每个节点都可能会接收到客户端的请求,但只有 leader 能处理,所以如果⾃⾝不是 leader,则需要转发给 leader。
2. 然后将⽤户的 KV 数据封装成⽇志结构,包括 term,index,command,预提交到本地。
3. 并⾏的向其他节点发送数据,也就是⽇志复制。
4. 如果在指定的时间内,过半节点返回成功,那么就提交这条⽇志。
5. 最后,更新⾃⼰的 commitIndex,lastApplied 等信息。
注意,复制不仅仅是简单的将这条⽇志发送到其他节点,这可能⽐我们想象的复杂,为了保证复杂⽹络环境下的⼀致性,Raft 保存了每个节点的成功复制过的⽇志的 index,即 nextIndex ,因此,如果对⽅之前⼀段时间宕机了,那么,从宕机那⼀刻开始,到当前这段时间的所有⽇志,都要发送给对⽅。
甚⾄于,如果对⽅觉得你发送的⽇志还是太⼤,那么就要递减的减⼩ nextIndex,复制更多的⽇志给对⽅。注意:这⾥是 Raft 实现分布式⼀致性的关键所在。
再来看看⽇志接收者的实现步骤:
1. 和⼼跳⼀样,要先检查对⽅ term,如果 term 都不对,那么就没什么好说的了。
2. 如果⽇志不匹配,那么返回 leader,告诉他,减⼩ nextIndex 重试。
3. 如果本地存在的⽇志和 leader 的⽇志冲突了,以 leader 的为准,删除⾃⾝的。
4. 最后,将⽇志应⽤到状态机,更新本地的 commitIndex,返回 leader 成功。
到这⾥,⽇志复制的部分就讲完了。
注意,实现⽇志复制的前提是,必须有⼀个正确的⽇志存储系统,即我们的 RocksDB,我们在 RocksD
B 的基础上,使⽤⼀种机制,维护了每个节点的LastIndex,⽆论何时何地,都能够得到正确的 LastIndex,这是实现⽇志复制不可获取的⼀部分。
验证“Leader 选举”和“⽇志复制”
写完了程序,如何验证是否正确呢?
当然是写验证程序。
我们⾸先验证 “Leader 选举”。其实这个⽐较好测试。
1. 在 idea 中配置 5 个 application 启动项,配置 main 类为 RaftNodeBootStrap 类, 加⼊ -DserverPort=8775 -DserverPort=8776 -
DserverPort=8777 -DserverPort=8778 -DserverPort=8779
系统配置, 表⽰分布式环境下的 5 个机器节点.
2. 依次启动 5 个 RaftNodeBootStrap 节点, 端⼝分别是 8775,8776, 8777, 8778, 8779.
3. 观察控制台, 约 6 秒后, 会发⽣选举事件,此时,会产⽣⼀个 leader. ⽽ leader 会⽴刻发送⼼跳维持⾃
⼰的地位.
4. 如果leader 的端⼝是 8775, 使⽤ idea 关闭 8775 端⼝,模拟节点挂掉, ⼤约 15 秒后, 会重新开始选举, 并且会在剩余的 4 个节点中,产⽣
⼀个新的 leader. 并开始发送⼼跳⽇志。
然后验证⽇志复制,分为 2 种情况:
正常状态下
1. 在 idea 中配置 5 个 application 启动项,配置 main 类为 RaftNodeBootStrap 类, 加⼊ -DserverPort=8775 -DserverPort=8776 -
DserverPort=8777 -DserverPort=8778 -DserverPort=8779
2. 依次启动 5 个 RaftNodeBootStrap 节点, 端⼝分别是 8775,8776, 8777, 8778, 8779.
3. 使⽤客户端写⼊ kv 数据.
4. 杀掉所有节点, 使⽤ junit test 读取每个 rocksDB 的值, 验证每个节点的数据是否⼀致.
⾮正常状态下
1. 在 idea 中配置 5 个 application 启动项,配置 main 类为 RaftNodeBootStrap 类, 加⼊ -DserverPort=8775 -DserverPort=8776 -
DserverPort=8777 -DserverPort=8778 -DserverPort=8779
2. 依次启动 5 个 RaftNodeBootStrap 节点, 端⼝分别是 8775,8776, 8777, 8778, 8779.
3. 使⽤客户端写⼊ kv 数据.
4. 杀掉 leader (假设是 8775).
5. 再次写⼊数据.
6. 重启 8775.
7. 关闭所有节点, 读取 RocksDB 验证数据⼀致性.
Summary
本⽂并没有贴很多代码,如果要贴代码的话,阅读体验将不会很好,并且代码也不能说明什么,如果想看具体实现,可以到 github 上看看,顺便给个 star :)
回到我们的初衷,我们并不奢望这段代码能够运⾏在⽣产环境中,就像我的另⼀个项⽬ Lu-RPC ⼀样。但,经历了⼀次编写可正确运⾏的玩具代码的经历,下次再次编写⼯程化的代码,应该会更加容易些。这点我深有体会。
可以稍微展开讲⼀下,在写完 Lu-RPC 项⽬后,我就接到了开发⽣产环境运⾏的限流熔断框架任务,此时,开发 Lu-RPC 的经历让我在开发该框架时,更加的从容和⾃如:)
再回到 Raft 上⾯来,虽然上⾯的测试⽤例跑过了,程序也经过了我反反复复的测试,但不代表这个程序就是 100% 正确的,特别是在复杂的分布式环境下。如果你对 Raft 有兴趣,欢迎⼀起交流沟通:)

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。