Mit6.824Lab3KVRaft实现
paper
Lab3 之前,结合 Lab2 的实现以及 Raft 论⽂进⾏实现 Lab3,即基于 Lab2 实现的 Raft 库构建容错键/值存储服务。
的 paper 中给出的进⾏理解本架构,下⾯会给出个⼈理解的通俗版本的图,来帮助理解。
其次,如果读过paper和Raft论⽂,应该会清楚⼀个要点:每个KVServer(raftServerId) 对应paper架构图⾥的 State Machine,也就是状态机,⽽每个 KVServer 对应 Lab2 实现的 Raft peer,并且 KVServer 之间是借助 Raft Service 来实现共识性,不直接交互的。
根据 paper ⾥对 Lab3 要求的描述,可以清楚 KVServer 通过ClientId可以知道 Client 的请求来⾃具体哪个
客户端,同时保存每个客户端的请求信息和状态,所以每个客户端请求过来时,都赋予了⼀个刚⽣成的唯⼀ID,并且同⼀个请求对应唯⼀的序列号(ClientId),这两个 ID 就可以确定唯⼀性请求。这些在 和 就有具体代码和注释说明。
客户端的Id⽤ nrand() 随机⽣成唯⼀ID,经过测试最多有7个客户端ID且不会重复,每个 Client 维护⼀个 lastRequestId,通过mathrand(len(KVServer))⽣成,表⽰每⼀次请求的 Seq 序列号 clientId。
KVServer  lastRequestId,使得 Client 并发调⽤时,能通过最新的 RequestId,得到最新的结果,保证应⽤程序的强⼀致性,这个强⼀致性通过定时器实现⼀段时间内(500ms)的分布式数据强⼀致性。
请求和响应流程
请求响应流程,以Put/Get为例⼦:
KVServer 收到 Client 的Request请求后,通过raft.Start() 提交Op给raft 库, 然后通过Chan机制,等待Raft 返回结果到 waitApplyCh,也就是等待Raft应⽤⽇志到状态机后,才通过给chan缓冲区放⼊响应数据来响应给KVServer。
在Raft的所有peer 进⾏ apply当前请求的命令Op后, 每个Server会在单独的线程ApplyLoop 中等待不断到来的ApplyCh中的Op,直到 ApplyCh 缓冲区得到 Raft 的响应。
Raft库执⾏这个Op(GET全部执⾏,重复的PUT, APPEND不执⾏)
Leader 等待Apply Loop 完成,之后根据Op中的信息将Raft库中的执⾏结果返回给Wait Channel , 中才有Wait Channel 在等结果
最后Leader Client
go kv.ReadRaftApplyCommandLoop()这个Loop⾥,监听读取KVServer的applyCh
通过在KVServer⾥的 applyCh chan raft.ApplyMsg ,借助管道chan的机制,实现写阻塞,也就是实现了在请求响应过程中,节点 peers监听⼀个chan 管道,管道接收到,才能触发接下来的操作。
只有Leader负责接受client的请求,才能触发以上条件,follower不主动触发,只能等待被leader同步
接受到请求后,Leader判定条件是否准确,正确则交由Raft#Start(Op)⽅法,接下来就阻塞等待⽅法的回调,等待结果返回。
根据需要设置 WaitChan 等待结果,同时设置Timeout,⽤来判定是否响应超时
之后Raft会将Leader的 Op 执⾏结果同步给所有Follower,ApplyEntry 同步到上层的每个KVServer
根据管道的返回时间是否超时,判定是否在强⼀致性的时间内能否得到响应
如果超时:
会先进⾏ifRequestDuplicate() 判断RequestId是否过时,依然是最新的RequestId 则从Leader的状态机中执⾏ Op 命令,返回本地⽇志执⾏命令后得到的结果
不超时,表⽰在⼀致性的有效时间内,只需要判断Raft响应的clientId和RequestId是否相同,即是否是最新的请求,是则表明KVServer的KVDB中有Op.key的最新数据,保证了数据的强⼀致性。
执⾏完Get或后,最后要删除管道的raftIndex对应的Op
请求阻塞问题
通过time.After 实现阻塞超时、重发。
因为⽆论是waitChan 还是labrpc中的Call⽅法,都没有“回调超时”的概念,会阻塞在哪⾥。
所以需要在端(或client端)实现计时器超时机制,避免⽆限阻塞。
负载均衡的理解重复请求问题
Lab3A Duplicate Request ,实现保证⼀个重复的请求不会在同⼀个状态机上被执⾏两次,每个请求对应唯⼀的ClientID:RequestID
对于KVServer收到Client端的请求,⽆论是否重复,我们都提交给Raft作为它的log. ⽽KVServer 通过 kv.ifRequestDuplicate⽅法负责在接受apply log时判定这个log代指的Request Op是不是重复的,是重复的,我们就不在状态机上执⾏,直接返回Put⽅法⾥考虑该问题就⾏,读操作不影响状态机的数据。
路由-负载均衡问题
Clinet . servers[] 不是⼀⼀对应的,⽽是随机shuffle过的。但是 和 是对应的。这
就导致了Client 发送请求到⼀个不是LEADER的KVServer, 这个KVServer可以拿到raft.leader的序号并传回Client, 但Client 并不能通过client.servers[raft.leader]来到真正的Leader, 还是要随机访问另⼀个。所以说我们收到ErrWrongLeader时候,只要再随机访问下⼀个KVServer即可
ck.RecentLeaderId = GetRandomServer(len(ck.Servers))
server := ck.RecentLeaderId
for{
// RPC请求KVServer的Get⽅法, 成功则返回leaderId
ok := ck.Servers[server].Call("KVServer.Get", &args, &reply)
// 换下⼀个Server,重试,直到 OK or Error
if !ok || reply.Err == ErrWrongLeader {
// LeaderId
server = (server + 1) % len(ck.Servers)
continue
...
}
快照
快照其实就是Server 维护的KeyValue数据库,可以看作是内存中⼀个map
对于Leader:
// 循环读取Raft已经应⽤的⽇志条⽬命令得到的回应
func (kv *KVServer) ReadRaftApplyCommandLoop() {
for message := range kv.applyCh {
if message.CommandValid {
kv.GetCommandFromRaft(message)
}
if message.SnapshotValid {
kv.GetSnapshotFromRaft(message)
}
}
}
在leader应⽤⽇志后,message.CommandValid 为true 说明RaftState[] 在递增,即有⽇志被应⽤并且命令是有效的。
那么Leader会执⾏响应的Get或Put操作完成后,根据⽇志条⽬阈值maxraftstate和当前⽇志条⽬数量RaftStateSize判断是否需要命令Raft进⾏Snapshot快照压缩操作。
如果需要,则调⽤MakeSnapshot⽅法,将⾃⾝的KVDB,RequestID等信息制作成snapshot, 并调⽤Raft库的Snapshot接⼝。
Leader 安装Snapshot , 这分为三部分,修剪log Entries [] , SnapShot 通过Persister进⾏持久化存储,之后在Appendentries中将本次的SnapShot信息发送给落后的Follower 最后返回执⾏结果给WaitChannel
对于Follower :
·  if message.SnapshotValid {
kv.GetSnapshotFromRaft(message)
}
1 . Leader执⾏ InstallSnapshot 的RPC⽅法后,Raft 层会获取 snapshot数据,裁剪log, 通过ApplyCh上报给Server (此时SnapshotValid: true)
2 . Follower的Applyloop 收到请求,调⽤CondInstallSnapshot() 来询问是否可以安装snapshot
// 从Raft中获取快照⽇志
func (kv *KVServer) GetSnapshotFromRaft(message raft.ApplyMsg) {
kv.mu.Lock()
defer kv.mu.Unlock()
if kv.rf.CondInstallSnapshot(message.SnapshotTerm, message.SnapshotIndex, message.Snapshot) {
// 追加快照⽇志
kv.ReadSnapshotToInstall(message.Snapshot)
kv.lastSSPointRaftLogIndex = message.SnapshotIndex
}
}
3 . CondInstallSnapshot() 判定snapshot安装条件,持久化snapshot, 并通知Server可以InstallSnapshot
KVServer数据结构
mu sync.Mutex
me int
/
/ 每个KVServer对应⼀个Raft
rf      *raft.Raft
applyCh chan raft.ApplyMsg
dead    int32 // set by Kill()
// 快照⽇志中,最后⽇志条⽬的State
maxraftstate int // snapshot if log grows this big
// Your definitions here.
// 保存put的数据,key : value
kvDB map[string]string
// index(Raft pper) -> chan
waitApplyCh map[int]chan Op
/
/ clientId : requestId
lastRequestId map[int64]int
// last Snapshot point & raftIndex
lastSSPointRaftLogIndex int
}
启动KVServer
// 启动KVServer
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
DPrintf("[InitKVServer---]Server %d", me)
// 注册rpc服务器
labgob.Register(Op{})
kv := new(KVServer)
< = me
kv.maxraftstate = maxraftstate
// You may need initialization code here.
kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
// You may need initialization code here.
// kv初始化
kv.kvDB = make(map[string]string)
kv.waitApplyCh = make(map[int]chan Op)
kv.lastRequestId = make(map[int64]int)
// 快照
snapshot := persister.ReadSnapshot()
if len(snapshot) > 0 {
// 读取快照⽇志
kv.ReadSnapshotToInstall(snapshot)
}
// 循环读取Raft已经应⽤的⽇志条⽬命令
go kv.ReadRaftApplyCommandLoop()
return kv
}
Put和Get
// RPC⽅法
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here.
if kv.killed() {
reply.Err = ErrWrongLeader
return
}
_, ifLeader := kv.rf.GetState()
// RaftServer必须是Leader
if !ifLeader {
reply.Err = ErrWrongLeader
return
}
op := Op{
Operation: "get",
Key:      args.Key,
Value:    "",
ClientId:  args.ClientId,
RequestId: args.RequestId,
}
/
/ 向Raft server 发送命令
raftIndex, _, _ := kv.rf.Start(op)
DPrintf("[GET StartToRaft]From Client %d (Request %d) To Server %d, key %v, raftIndex %d", args.ClientId, args.RequestId, kv.me, op.Key, raftIndex)
// waitForCh
kv.mu.Lock()
// chForRaftIndex为保存Op的chan,raftIndex为Raft Server的LastLogIndex+1
// ⽤于实现RPC调⽤Raft.Start时,保存RPC返回的Op,通过 Raft Server 的 lastLogIndex获取
// 通过raft的lastLogIndex,就能得到该⽇志条⽬保存的value,并保存到KVDB中
chForRaftIndex, exist := kv.waitApplyCh[raftIndex]
// Loop Apply ,技术上要求线性化
// 不存在该记录,表明调⽤还未返回结果,则继续等待调⽤返回
if !exist {
kv.waitApplyCh[raftIndex] = make(chan Op, 1)
chForRaftIndex = kv.waitApplyCh[raftIndex]
}
// RPC调⽤完成
kv.mu.Unlock()
// Timeout
select {
// 超过⼀致性要求的时间,则需要通过lastRequestId,从KVDB中获取结果
case <-time.After(time.Millisecond * CONSENSUS_TIMEOUT):
DPrintf("[GET TIMEOUT]From Client %d (Request %d) To Server %d, key %v, raftIndex %d", args.ClientId, args.RequestId, kv.me, op.Key, raftIndex)
_, ifLeader := kv.rf.GetState()
// 该client的最新RequestId是否是newRequestId,不是,则返回最新RequestId
// 该步骤保证了client并发调⽤KVServer时,根据最新的RequestId,得到最新的结果
if kv.ifRequestDuplicate(op.ClientId, op.RequestId) && ifLeader {
// 根据命令获取该client最新RequestId得到并保存在KVDB的value
value, exist := kv.ExecuteGetOpOnKVDB(op)
if exist {
reply.Err = OK
reply.Value = value
} else {
reply.Err = ErrNoKey
reply.Value = ""
}
} else {
reply.Err = ErrWrongLeader
}
// 在⼀致性的有效时间内:
case raftCommitOp := <-chForRaftIndex:
DPrintf("[WaitChanGetRaftApplyMessage<--]Server %d , get Command <-- Index:%d , ClientId %d, RequestId %d, Opreation %v, Key :%v, Value :%v", kv.me, raftIndex, op.ClientId, op.RequestId, op.Operation, op.Key, op.Value)  // 该已提交到Raft的RPC请求,是本次的Op命令
if raftCommitOp.ClientId == op.ClientId &&
raftCommitOp.RequestId == op.RequestId {
// 则从KVServer的Map直接获取value
value, exist := kv.ExecuteGetOpOnKVDB(op)
if exist {
reply.Err = OK
reply.Value = value
} else {
reply.Err = ErrNoKey
reply.Value = ""
}
} else {
reply.Err = ErrWrongLeader
}
}
kv.mu.Lock()
// Get结束后,删除chan map中raftIndex对应的Op
delete(kv.waitApplyCh, raftIndex)
kv.mu.Unlock()
return
}
Put⽅法
// RPC⽅法
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
/
/ Your code here.
if kv.killed() {
reply.Err = ErrWrongLeader
return
}
_, ifLeader := kv.rf.GetState()
// RaftServer必须是Leader
if !ifLeader {
reply.Err = ErrWrongLeader
return
}
op := Op{
Operation: args.Op,
Key:      args.Key,
Value:    args.Value,
ClientId:  args.ClientId,
RequestId: args.RequestId,
}
// 向Raft server 发送命令
raftIndex, _, _ := kv.rf.Start(op)
DPrintf("[PUTAPPEND StartToRaft]From Client %d (Request %d) To Server %d, key %v, raftIndex %d", args.ClientId, args.RequestId, kv.me, op.Key, raftIndex)
// waitForCh
kv.mu.Lock()
// chForRaftIndex为保存Op的chan,raftIndex为Raft Server的LastLogIndex+1
// ⽤于实现RPC调⽤Raft.Start时,保存RPC返回的Op,通过 Raft Server 的 lastLogIndex获取
// 通过raft的lastLogIndex,就能得到该⽇志条⽬保存的value,并保存到KVDB中
chForRaftIndex, exist := kv.waitApplyCh[raftIndex]
// Loop Apply ,技术上要求线性化
// 不存在该记录,表明调⽤还未返回结果,则继续等待调⽤返回
if !exist {
kv.waitApplyCh[raftIndex] = make(chan Op, 1)
chForRaftIndex = kv.waitApplyCh[raftIndex]
}
/
/ RPC调⽤完成
kv.mu.Unlock()
// Timeout
select {
// 超过⼀致性要求的时间,则需要通过lastRequestId,从KVDB中获取结果
case <-time.After(time.Millisecond * CONSENSUS_TIMEOUT):
DPrintf("[TIMEOUT PUTAPPEND ]Server %d , get Command <-- Index:%d , ClientId %d, RequestId %d, Opreation %v, Key :%v, Value :%v", kv.me, raftIndex, op.ClientId, op.RequestId, op.Operation, op.Key, op.Value)
// 该client的最新RequestId是否是newRequestId,不是,则返回最新RequestId
// 该步骤保证了client并发调⽤KVServer时,根据最新的RequestId,得到最新的结果
if kv.ifRequestDuplicate(op.ClientId, op.RequestId) {
reply.Err = OK
} else {
reply.Err = ErrWrongLeader
}
// 在⼀致性的有效时间内:
case raftCommitOp := <-chForRaftIndex:
DPrintf("[WaitChanGetRaftApplyMessage<--]Server %d , get Command <-- Index:%d , ClientId %d, RequestId %d, Opreation %v, Key :%v, Value :%v", kv.me, raftIndex, op.ClientId, op.RequestId, op.Operation, op.Key, op.Value)  // 该已提交到Raft的RPC请求,是本次的Op命令
if raftCommitOp.ClientId == op.ClientId &&
raftCommitOp.RequestId == op.RequestId {
reply.Err = OK
} else {
reply.Err = ErrWrongLeader
}
}
kv.mu.Lock()
delete(kv.waitApplyCh, raftIndex)
kv.mu.Unlock()
return
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。