MIT6.5840 2024 Spring Lab3
MIT6.5840 2024 Spring Lab3前言
此lab是该课程的第三个实验,这个实验会让你实现Raft算法,最终实现一个容错的KV存储系统。该实验主要有四部分组成3A:leader选举,3B:日志,3C:持久化,3D:日志压缩。这四个部分我是单独完成的,也就是做完3A再做3B以此类推,很多人说3A和3B可以一起,不过为了后面自己看的时候不会懵逼我就分开来做了,代码也会分开展示,因为不同部分可能会用到同一段代码会有重复,所以下面的文章可能看起来全是代码。
前置知识
GO
1.select:
select时go里面的控制语句,很像switch的结构,内部也有很多case,每个case都是接收channel信息或者发送channel信息,select会选择一个可执行case执行,如果没有可执行的case就会阻塞,如果有default,就会执default。
select { caseheartsbeatsTime heartsbeatsTimer *time.Timer electionTimer *time.Timer heartsbeatsTimeint //发送一次心跳时间间隔 electionTimeoutint //选举超时时间(超过这个时间(实验要求大于100ms),该服务器就作为候选人开始选举) leaderId int //当前leader的id state int //0:follower 1:leader 2:candidater /*****moyoj-3B******/ //to test applyChan chan ApplyMsg commitIndex int lastApplied int //leader nextIndex[]int matchIndex []int /*****moyoj-3D******/ lastSnapshotIndex int //快照中最后一个日志的下标 lastSnapshotTermint //快照中最后一个日志的term currentTerm int votedFor int logs []logEntry}func (rf *Raft) GetState() (int, bool) { var term int var isleader bool // Your code here (3A). /*****moyoj-3A******/ rf.statusAccessMutex.Lock() defer rf.statusAccessMutex.Unlock() term = rf.currentTerm isleader = rf.state == 1 return term, isleader}func (rf *Raft) persist(snapshot []byte) { /*****moyoj-3C******/ w := new(bytes.Buffer) e := labgob.NewEncoder(w) e.Encode(rf.currentTerm) e.Encode(rf.votedFor) e.Encode(rf.logs) /*****moyoj-3D******/ e.Encode(rf.lastSnapshotIndex) e.Encode(rf.lastSnapshotTerm) raftstate := w.Bytes() rf.persister.Save(raftstate, snapshot)}// restore previously persisted state.func (rf *Raft) readPersist(data []byte) { if data == nil || len(data) < 1 { // bootstrap without any state? return } /*****moyoj-3C******/ r := bytes.NewBuffer(data) d := labgob.NewDecoder(r) var currentTerm int var votrFor int var logs []logEntry /*****moyoj-3D******/ var lastSnapshotIndex int var lastSnapshotTerm int if d.Decode(¤tTerm) != nil || d.Decode(&votrFor) != nil || d.Decode(&logs) != nil || d.Decode(&lastSnapshotIndex) != nil || d.Decode(&lastSnapshotTerm) != nil { DPrintf("持久化数据解析失败") } else { rf.currentTerm = currentTerm rf.votedFor = votrFor rf.logs = logs rf.lastSnapshotIndex = lastSnapshotIndex rf.lastSnapshotTerm = lastSnapshotTerm //初始化为snapshot最后一条日志的下标,否则crash重启applyEntries会越界 rf.lastApplied = lastSnapshotIndex rf.commitIndex = lastSnapshotIndex }}func (rf *Raft) Snapshot(index int, snapshot []byte) { // Your code here (3D). rf.statusAccessMutex.Lock() defer rf.statusAccessMutex.Unlock() if index-1rf.commitIndex { return } rf.lastSnapshotTerm = rf.logs.Term temp := rf.logs rf.lastSnapshotIndex = index - 1 rf.logs = make([]logEntry, len(temp)) copy(rf.logs, temp) rf.persist(snapshot) }type RequestVoteArgs struct { // Your data here (3A, 3B). /*****moyoj-3A******/ Term int CandidateId int /*****moyoj-3B******/ LastLogIndex int LastLogTermint}type RequestVoteReply struct { // Your data here (3A). /*****moyoj-3A******/ Term int VoteGranted bool}type AppendEntriesRequest struct { /*****moyoj-3A******/ Term int LeaderId int Entries []logEntry PreLogIndexint PreLogTerm int LeaderCommit int}type AppendEntriesResponse struct { /*****moyoj-3A******/ Term int Success bool /*****moyoj-3B******/ FastBack int}type InstallSnapshotRequest struct { /*****moyoj-3D******/ Term int LeaderId int LastIncludeIndex int LastIncludeTermint Offset int Data []byte Done bool}type InstallSnapshotResponse struct { /*****moyoj-3D******/ Term int}func (rf *Raft) electStart(curterm int, lastLogIndex int, lastLogTerm int, me int, peerslen int) { var sendWaitGroup sync.WaitGroup voteCount := 1 //选票数 for i := 0; i < peerslen; i++ { if i == me { continue } sendWaitGroup.Add(1) go func(serverid int) { var request RequestVoteArgs var response RequestVoteReply request.CandidateId = me request.Term = curterm /*****moyoj-3B******/ request.LastLogIndex = lastLogIndex request.LastLogTerm = lastLogTerm ok := rf.sendRequestVote(serverid, &request, &response) if ok { rf.statusAccessMutex.Lock() if rf.state == 2 && rf.currentTerm == curterm { if response.VoteGranted { voteCount += 1 if voteCount >= peerslen/2+1 { rf.state = 1 rf.votedFor = -1 /*****moyoj-3C******/ rf.persist(rf.persister.ReadSnapshot()) /*****moyoj-3B******/ //重置next和match数组 for index := 0; index < len(rf.peers); index++ { rf.nextIndex = len(rf.logs) + rf.lastSnapshotIndex + 1 rf.matchIndex = -1 } go rf.sendHeartsbeats(curterm, rf.commitIndex, me, peerslen) rf.heartsbeatsTimer.Reset(time.Duration(rf.heartsbeatsTime) * time.Millisecond) } } else if response.Term > rf.currentTerm { rf.currentTerm = response.Term rf.state = 0 rf.votedFor = -1 //!!!!!选举失败,置成未投票状态 rf.electionTimer.Reset(time.Duration(rf.electionTimeout+rand.Intn(rf.electionTimeout)) * time.Millisecond) /*****moyoj-3C******/ rf.persist(rf.persister.ReadSnapshot()) } } rf.statusAccessMutex.Unlock() } sendWaitGroup.Done() }(i) } sendWaitGroup.Wait() rf.statusAccessMutex.Lock() if rf.state == 2 && rf.currentTerm == curterm { rf.state = 0 rf.votedFor = -1 //!!!选举失败变为-1 rf.electionTimer.Reset(time.Duration(rf.electionTimeout+rand.Intn(rf.electionTimeout)) * time.Millisecond) /*****moyoj-3C******/ rf.persist(rf.persister.ReadSnapshot()) } rf.statusAccessMutex.Unlock()}func (rf *Raft) sendHeartsbeats(curterm int, leaderCommit int, me int, peerslen int) { for i := 0; i < peerslen; i++ { if i == me { continue } go func(serverid int) { rf.statusAccessMutex.Lock() if rf.currentTerm != curterm { rf.statusAccessMutex.Unlock() return } nextIndex := rf.nextIndex //说明要发送的日志已经形成快照,follower远远落后,直接发送installsnapshotrpc了,这也是为什么论文指出installsnapshotrpc当成一次心跳的原因,就是在发送追加日志(心跳)的途中发现没法追加时才发送installSnapshot rpc if nextIndexrf.currentTerm { rf.currentTerm = response.Term rf.state = 0 rf.votedFor = -1 rf.electionTimer.Reset(time.Duration(rf.electionTimeout+rand.Intn(rf.electionTimeout)) * time.Millisecond) /*****moyoj-3C******/ rf.persist(rf.persister.ReadSnapshot()) return } if rf.nextIndex == nextIndex { rf.nextIndex = curLastSnapshotIndex + 1 } rf.matchIndex = rf.nextIndex - 1 //更新commitindex rf.updateNextMatch(me) } } } else { var request AppendEntriesRequest var response AppendEntriesResponse request.Entries = []logEntry{} request.LeaderId = me request.Term = curterm request.LeaderCommit = leaderCommit /*****moyoj-3B******/ request.PreLogIndex = -1 request.PreLogTerm = -1 //一次性发送没发送的日志 request.Entries = rf.logs request.PreLogIndex = nextIndex - 1 if (nextIndex - rf.lastSnapshotIndex - 1 - 1) >= 0 { request.PreLogTerm = rf.logs.Term } else { request.PreLogTerm = rf.lastSnapshotTerm } rf.statusAccessMutex.Unlock() ok := rf.peers.Call("Raft.AppendEntries", &request, &response) if ok { rf.statusAccessMutex.Lock() defer rf.statusAccessMutex.Unlock() if rf.currentTerm == curterm { if !response.Success { if rf.currentTerm < response.Term { rf.currentTerm = response.Term rf.state = 0 rf.votedFor = -1 rf.electionTimer.Reset(time.Duration(rf.electionTimeout+rand.Intn(rf.electionTimeout)) * time.Millisecond) /*****moyoj-3C******/ rf.persist(rf.persister.ReadSnapshot()) } else { /*****moyoj-3B******/ //nextIndex后移 rf.nextIndex = response.FastBack } } if len(request.Entries) == 0 || !response.Success { return } //成功发送日志追加 /*****moyoj-3B******/ if nextIndex == rf.nextIndex { //nextIndex没有变更,还是和当时发送消息时一样 rf.nextIndex = nextIndex + len(request.Entries) } rf.matchIndex = rf.nextIndex - 1 //这里更新一下commitIndex rf.updateNextMatch(me) } } } }(i) }}/*****moyoj-3B******///追加日志是否匹配func (rf *Raft) matchNewEntries(Entries []logEntry, preLogIndex int, preLogTerm int, response *AppendEntriesResponse) bool { if preLogIndex != -1 && len(rf.logs) = rf.lastSnapshotIndex,如果是相等,则一定能把term匹配上,则不需要让leader回退nextIndex if preLogIndex != -1 && preLogIndex != rf.lastSnapshotIndex && rf.logs.Term != preLogTerm { //同下标的日志项的term与新日志的term不匹配 response.FastBack = rf.commitIndex + 1 //让leader的nextIndex直接跳到当前节点的commitIndex+1处 response.Success = false return false } response.Success = true rf.logs = rf.logs rf.logs = append(rf.logs, Entries...) return true}/*****moyoj-3B******/func (rf *Raft) applyEntries(sleep int) { for !rf.killed() { time.Sleep(time.Duration(sleep) * time.Millisecond) rf.statusAccessMutex.Lock() appliedIndex := rf.lastApplied commitIndex := rf.commitIndex logs := make([]logEntry, len(rf.logs)) lastSnapshotIndex := rf.lastSnapshotIndex copy(logs, rf.logs) rf.statusAccessMutex.Unlock() //向上层提交的过程可以先释放锁 for ; appliedIndex < commitIndex; appliedIndex++ { var sendApply ApplyMsg sendApply.Command = logs.Command sendApply.CommandIndex = appliedIndex + 2 sendApply.CommandValid = true rf.applyChanrf.commitIndex { if len(rf.logs)-1 < request.LeaderCommit-rf.lastSnapshotIndex-1 { rf.commitIndex = len(rf.logs) + rf.lastSnapshotIndex + 1 - 1 } else { rf.commitIndex = request.LeaderCommit } } } rf.electionTimer.Reset(time.Duration(rf.electionTimeout+rand.Intn(rf.electionTimeout)) * time.Millisecond) rf.currentTerm = request.Term rf.state = 0 rf.leaderId = request.LeaderId response.Term = request.Term /*****moyoj-3C******/ rf.persist(rf.persister.ReadSnapshot())}// example RequestVote RPC handler.func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { // Your code here (3A, 3B). /*****moyoj-3A******/ rf.statusAccessMutex.Lock() defer rf.statusAccessMutex.Unlock() if args.Term < rf.currentTerm { reply.VoteGranted = false reply.Term = rf.currentTerm return } if args.Term == rf.currentTerm { if rf.state == 1 || rf.votedFor != -1 { reply.VoteGranted = false reply.Term = rf.currentTerm return } } /*****moyoj-3B******/ /**主要是为了确保日志复制的安全,确保新的领导人不会用自己较为旧的日志覆盖掉已经提交的日志 1.领导人只能对自己任期内的日志commit,不会对旧任期的日志进行提交(如果提交旧的日志就会碰到论文5.4.2的情况),新 领导人对自己任期内新日志提交会顺带把旧的提交。这里所谓的提交其实就是领导人更新commitIndex!非常重要!! 2.候选人日志记录必须不旧于(大于或者等于)大部分节点的日志记录,何为较新? 最后一条日志term更大的日志记录更新 或者 最后一条日志term相等但是日志记录数目多的更新 以上两点共同确保了安全性 */ reply.VoteGranted = true if len(rf.logs) != 0 && rf.logs.Term > args.LastLogTerm { reply.VoteGranted = false } //请求的日志数目较少 if len(rf.logs) != 0 && rf.logs.Term == args.LastLogTerm && args.LastLogIndex < len(rf.logs)+rf.lastSnapshotIndex+1-1 { reply.VoteGranted = false } /*****moyoj-3D******/ if rf.lastSnapshotIndex != -1 && rf.lastSnapshotTerm > args.LastLogTerm { reply.VoteGranted = false } if rf.lastSnapshotIndex != -1 && rf.lastSnapshotTerm == args.LastLogTerm && args.LastLogIndex < rf.lastSnapshotIndex { reply.VoteGranted = false } if reply.VoteGranted { //更新leaderid和term rf.votedFor = args.CandidateId //投票候选人 rf.electionTimer.Reset(time.Duration(rf.electionTimeout+rand.Intn(rf.electionTimeout)) * time.Millisecond) } rf.currentTerm = args.Term rf.state = 0 reply.Term = args.Term /*****moyoj-3C******/ rf.persist(rf.persister.ReadSnapshot())}func (rf *Raft) InstallSnapshot(request *InstallSnapshotRequest, response *InstallSnapshotResponse) { rf.statusAccessMutex.Lock() defer rf.statusAccessMutex.Unlock() if request.Term < rf.currentTerm { response.Term = rf.currentTerm return } curindex := request.LastIncludeIndex - rf.lastSnapshotIndex - 1 if curindex < 0 { response.Term = rf.currentTerm return } if curindex < len(rf.logs) { if rf.logs.Term != request.Term { rf.logs = make([]logEntry, 0) } else { logs := rf.logs rf.logs = make([]logEntry, len(rf.logs)-curindex-1) copy(rf.logs, logs) } } else { rf.logs = make([]logEntry, 0) } rf.lastSnapshotIndex = request.LastIncludeIndex rf.lastSnapshotTerm = request.LastIncludeTerm rf.lastApplied = request.LastIncludeIndex rf.commitIndex = request.LastIncludeIndex rf.electionTimer.Reset(time.Duration(rf.electionTimeout+rand.Intn(rf.electionTimeout)) * time.Millisecond) rf.currentTerm = request.Term rf.state = 0 rf.leaderId = request.LeaderId response.Term = request.Term rf.persist(request.Data) go func() { var sendApply ApplyMsg sendApply.CommandValid = false sendApply.Snapshot = request.Data sendApply.SnapshotIndex = rf.lastSnapshotIndex + 1 sendApply.SnapshotTerm = rf.lastSnapshotTerm sendApply.SnapshotValid = true rf.applyChan
页:
[1]