敕码 发表于 2025-6-7 16:11:41

MIT6.5840 2024 Spring Lab3

MIT6.5840 2024 Spring Lab3

前言

  此lab是该课程的第三个实验,这个实验会让你实现Raft算法,最终实现一个容错的KV存储系统。该实验主要有四部分组成3A:leader选举,3B:日志,3C:持久化,3D:日志压缩。这四个部分我是单独完成的,也就是做完3A再做3B以此类推,很多人说3A和3B可以一起,不过为了后面自己看的时候不会懵逼我就分开来做了,代码也会分开展示,因为不同部分可能会用到同一段代码会有重复,所以下面的文章可能看起来全是代码。
前置知识

GO

1.select:
  select时go里面的控制语句,很像switch的结构,内部也有很多case,每个case都是接收channel信息或者发送channel信息,select会选择一个可执行case执行,如果没有可执行的case就会阻塞,如果有default,就会执default。
select {    caseheartsbeatsTime        heartsbeatsTimer *time.Timer        electionTimer    *time.Timer        heartsbeatsTimeint //发送一次心跳时间间隔        electionTimeoutint //选举超时时间(超过这个时间(实验要求大于100ms),该服务器就作为候选人开始选举)        leaderId int //当前leader的id        state    int //0:follower 1:leader 2:candidater        /*****moyoj-3B******/        //to test        applyChan   chan ApplyMsg        commitIndex int        lastApplied int        //leader        nextIndex[]int        matchIndex []int        /*****moyoj-3D******/        lastSnapshotIndex int //快照中最后一个日志的下标        lastSnapshotTermint //快照中最后一个日志的term        currentTerm int        votedFor    int        logs      []logEntry}func (rf *Raft) GetState() (int, bool) {        var term int        var isleader bool        // Your code here (3A).        /*****moyoj-3A******/        rf.statusAccessMutex.Lock()        defer rf.statusAccessMutex.Unlock()        term = rf.currentTerm        isleader = rf.state == 1        return term, isleader}func (rf *Raft) persist(snapshot []byte) {        /*****moyoj-3C******/        w := new(bytes.Buffer)        e := labgob.NewEncoder(w)        e.Encode(rf.currentTerm)        e.Encode(rf.votedFor)        e.Encode(rf.logs)        /*****moyoj-3D******/        e.Encode(rf.lastSnapshotIndex)        e.Encode(rf.lastSnapshotTerm)        raftstate := w.Bytes()        rf.persister.Save(raftstate, snapshot)}// restore previously persisted state.func (rf *Raft) readPersist(data []byte) {        if data == nil || len(data) < 1 { // bootstrap without any state?                return        }        /*****moyoj-3C******/        r := bytes.NewBuffer(data)        d := labgob.NewDecoder(r)        var currentTerm int        var votrFor int        var logs []logEntry        /*****moyoj-3D******/        var lastSnapshotIndex int        var lastSnapshotTerm int        if d.Decode(&currentTerm) != nil || d.Decode(&votrFor) != nil || d.Decode(&logs) != nil || d.Decode(&lastSnapshotIndex) != nil || d.Decode(&lastSnapshotTerm) != nil {                DPrintf("持久化数据解析失败")        } else {                rf.currentTerm = currentTerm                rf.votedFor = votrFor                rf.logs = logs                rf.lastSnapshotIndex = lastSnapshotIndex                rf.lastSnapshotTerm = lastSnapshotTerm                //初始化为snapshot最后一条日志的下标,否则crash重启applyEntries会越界                rf.lastApplied = lastSnapshotIndex                rf.commitIndex = lastSnapshotIndex        }}func (rf *Raft) Snapshot(index int, snapshot []byte) {        // Your code here (3D).        rf.statusAccessMutex.Lock()        defer rf.statusAccessMutex.Unlock()        if index-1rf.commitIndex {                return        }        rf.lastSnapshotTerm = rf.logs.Term        temp := rf.logs        rf.lastSnapshotIndex = index - 1        rf.logs = make([]logEntry, len(temp))        copy(rf.logs, temp)        rf.persist(snapshot)        }type RequestVoteArgs struct {        // Your data here (3A, 3B).        /*****moyoj-3A******/        Term      int        CandidateId int        /*****moyoj-3B******/        LastLogIndex int        LastLogTermint}type RequestVoteReply struct {        // Your data here (3A).        /*****moyoj-3A******/        Term      int        VoteGranted bool}type AppendEntriesRequest struct {        /*****moyoj-3A******/        Term         int        LeaderId   int        Entries      []logEntry        PreLogIndexint        PreLogTerm   int        LeaderCommit int}type AppendEntriesResponse struct {        /*****moyoj-3A******/        Term    int        Success bool        /*****moyoj-3B******/        FastBack int}type InstallSnapshotRequest struct {        /*****moyoj-3D******/        Term             int        LeaderId         int        LastIncludeIndex int        LastIncludeTermint        Offset         int        Data             []byte        Done             bool}type InstallSnapshotResponse struct {        /*****moyoj-3D******/        Term int}func (rf *Raft) electStart(curterm int, lastLogIndex int, lastLogTerm int, me int, peerslen int) {        var sendWaitGroup sync.WaitGroup        voteCount := 1 //选票数        for i := 0; i < peerslen; i++ {                if i == me {                        continue                }                sendWaitGroup.Add(1)                go func(serverid int) {                        var request RequestVoteArgs                        var response RequestVoteReply                        request.CandidateId = me                        request.Term = curterm                        /*****moyoj-3B******/                        request.LastLogIndex = lastLogIndex                        request.LastLogTerm = lastLogTerm                        ok := rf.sendRequestVote(serverid, &request, &response)                        if ok {                                rf.statusAccessMutex.Lock()                                if rf.state == 2 && rf.currentTerm == curterm {                                        if response.VoteGranted {                                                voteCount += 1                                                if voteCount >= peerslen/2+1 {                                                        rf.state = 1                                                        rf.votedFor = -1                                                        /*****moyoj-3C******/                                                        rf.persist(rf.persister.ReadSnapshot())                                                        /*****moyoj-3B******/                                                        //重置next和match数组                                                        for index := 0; index < len(rf.peers); index++ {                                                                rf.nextIndex = len(rf.logs) + rf.lastSnapshotIndex + 1                                                                rf.matchIndex = -1                                                        }                                                        go rf.sendHeartsbeats(curterm, rf.commitIndex, me, peerslen)                                                        rf.heartsbeatsTimer.Reset(time.Duration(rf.heartsbeatsTime) * time.Millisecond)                                                }                                        } else if response.Term > rf.currentTerm {                                                rf.currentTerm = response.Term                                                rf.state = 0                                                rf.votedFor = -1 //!!!!!选举失败,置成未投票状态                                                rf.electionTimer.Reset(time.Duration(rf.electionTimeout+rand.Intn(rf.electionTimeout)) * time.Millisecond)                                                /*****moyoj-3C******/                                                rf.persist(rf.persister.ReadSnapshot())                                        }                                }                                rf.statusAccessMutex.Unlock()                        }                        sendWaitGroup.Done()                }(i)        }        sendWaitGroup.Wait()        rf.statusAccessMutex.Lock()        if rf.state == 2 && rf.currentTerm == curterm {                rf.state = 0                rf.votedFor = -1 //!!!选举失败变为-1                rf.electionTimer.Reset(time.Duration(rf.electionTimeout+rand.Intn(rf.electionTimeout)) * time.Millisecond)                /*****moyoj-3C******/                rf.persist(rf.persister.ReadSnapshot())        }        rf.statusAccessMutex.Unlock()}func (rf *Raft) sendHeartsbeats(curterm int, leaderCommit int, me int, peerslen int) {        for i := 0; i < peerslen; i++ {                if i == me {                        continue                }                go func(serverid int) {                        rf.statusAccessMutex.Lock()                        if rf.currentTerm != curterm {                                rf.statusAccessMutex.Unlock()                                return                        }                        nextIndex := rf.nextIndex                        //说明要发送的日志已经形成快照,follower远远落后,直接发送installsnapshotrpc了,这也是为什么论文指出installsnapshotrpc当成一次心跳的原因,就是在发送追加日志(心跳)的途中发现没法追加时才发送installSnapshot rpc                        if nextIndexrf.currentTerm {                                                        rf.currentTerm = response.Term                                                        rf.state = 0                                                        rf.votedFor = -1                                                        rf.electionTimer.Reset(time.Duration(rf.electionTimeout+rand.Intn(rf.electionTimeout)) * time.Millisecond)                                                        /*****moyoj-3C******/                                                        rf.persist(rf.persister.ReadSnapshot())                                                        return                                                }                                                if rf.nextIndex == nextIndex {                                                        rf.nextIndex = curLastSnapshotIndex + 1                                                }                                                rf.matchIndex = rf.nextIndex - 1                                                //更新commitindex                                                rf.updateNextMatch(me)                                        }                                }                        } else {                                var request AppendEntriesRequest                                var response AppendEntriesResponse                                request.Entries = []logEntry{}                                request.LeaderId = me                                request.Term = curterm                                request.LeaderCommit = leaderCommit                                /*****moyoj-3B******/                                request.PreLogIndex = -1                                request.PreLogTerm = -1                                //一次性发送没发送的日志                                request.Entries = rf.logs                                request.PreLogIndex = nextIndex - 1                                if (nextIndex - rf.lastSnapshotIndex - 1 - 1) >= 0 {                                        request.PreLogTerm = rf.logs.Term                                } else {                                        request.PreLogTerm = rf.lastSnapshotTerm                                }                                rf.statusAccessMutex.Unlock()                                ok := rf.peers.Call("Raft.AppendEntries", &request, &response)                                if ok {                                        rf.statusAccessMutex.Lock()                                        defer rf.statusAccessMutex.Unlock()                                        if rf.currentTerm == curterm {                                                if !response.Success {                                                        if rf.currentTerm < response.Term {                                                                rf.currentTerm = response.Term                                                                rf.state = 0                                                                rf.votedFor = -1                                                                rf.electionTimer.Reset(time.Duration(rf.electionTimeout+rand.Intn(rf.electionTimeout)) * time.Millisecond)                                                                /*****moyoj-3C******/                                                                rf.persist(rf.persister.ReadSnapshot())                                                        } else {                                                                /*****moyoj-3B******/                                                                //nextIndex后移                                                                rf.nextIndex = response.FastBack                                                        }                                                }                                                if len(request.Entries) == 0 || !response.Success {                                                        return                                                }                                                //成功发送日志追加                                                /*****moyoj-3B******/                                                if nextIndex == rf.nextIndex { //nextIndex没有变更,还是和当时发送消息时一样                                                        rf.nextIndex = nextIndex + len(request.Entries)                                                }                                                rf.matchIndex = rf.nextIndex - 1                                                //这里更新一下commitIndex                                                rf.updateNextMatch(me)                                        }                                }                        }                }(i)        }}/*****moyoj-3B******///追加日志是否匹配func (rf *Raft) matchNewEntries(Entries []logEntry, preLogIndex int, preLogTerm int, response *AppendEntriesResponse) bool {        if preLogIndex != -1 && len(rf.logs) = rf.lastSnapshotIndex,如果是相等,则一定能把term匹配上,则不需要让leader回退nextIndex        if preLogIndex != -1 && preLogIndex != rf.lastSnapshotIndex && rf.logs.Term != preLogTerm {                //同下标的日志项的term与新日志的term不匹配                response.FastBack = rf.commitIndex + 1 //让leader的nextIndex直接跳到当前节点的commitIndex+1处                response.Success = false                return false        }        response.Success = true        rf.logs = rf.logs        rf.logs = append(rf.logs, Entries...)        return true}/*****moyoj-3B******/func (rf *Raft) applyEntries(sleep int) {        for !rf.killed() {                time.Sleep(time.Duration(sleep) * time.Millisecond)                rf.statusAccessMutex.Lock()                appliedIndex := rf.lastApplied                commitIndex := rf.commitIndex                logs := make([]logEntry, len(rf.logs))                lastSnapshotIndex := rf.lastSnapshotIndex                copy(logs, rf.logs)                rf.statusAccessMutex.Unlock()                //向上层提交的过程可以先释放锁                for ; appliedIndex < commitIndex; appliedIndex++ {                        var sendApply ApplyMsg                        sendApply.Command = logs.Command                        sendApply.CommandIndex = appliedIndex + 2                        sendApply.CommandValid = true                        rf.applyChanrf.commitIndex {                        if len(rf.logs)-1 < request.LeaderCommit-rf.lastSnapshotIndex-1 {                                rf.commitIndex = len(rf.logs) + rf.lastSnapshotIndex + 1 - 1                        } else {                                rf.commitIndex = request.LeaderCommit                        }                }        }        rf.electionTimer.Reset(time.Duration(rf.electionTimeout+rand.Intn(rf.electionTimeout)) * time.Millisecond)        rf.currentTerm = request.Term        rf.state = 0        rf.leaderId = request.LeaderId        response.Term = request.Term        /*****moyoj-3C******/        rf.persist(rf.persister.ReadSnapshot())}// example RequestVote RPC handler.func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {        // Your code here (3A, 3B).        /*****moyoj-3A******/        rf.statusAccessMutex.Lock()        defer rf.statusAccessMutex.Unlock()        if args.Term < rf.currentTerm {                reply.VoteGranted = false                reply.Term = rf.currentTerm                return        }        if args.Term == rf.currentTerm {                if rf.state == 1 || rf.votedFor != -1 {                        reply.VoteGranted = false                        reply.Term = rf.currentTerm                        return                }        }        /*****moyoj-3B******/        /**主要是为了确保日志复制的安全,确保新的领导人不会用自己较为旧的日志覆盖掉已经提交的日志        1.领导人只能对自己任期内的日志commit,不会对旧任期的日志进行提交(如果提交旧的日志就会碰到论文5.4.2的情况),新                领导人对自己任期内新日志提交会顺带把旧的提交。这里所谓的提交其实就是领导人更新commitIndex!非常重要!!        2.候选人日志记录必须不旧于(大于或者等于)大部分节点的日志记录,何为较新?                最后一条日志term更大的日志记录更新 或者 最后一条日志term相等但是日志记录数目多的更新        以上两点共同确保了安全性        */        reply.VoteGranted = true        if len(rf.logs) != 0 && rf.logs.Term > args.LastLogTerm {                reply.VoteGranted = false        }        //请求的日志数目较少        if len(rf.logs) != 0 && rf.logs.Term == args.LastLogTerm && args.LastLogIndex < len(rf.logs)+rf.lastSnapshotIndex+1-1 {                reply.VoteGranted = false        }        /*****moyoj-3D******/        if rf.lastSnapshotIndex != -1 && rf.lastSnapshotTerm > args.LastLogTerm {                reply.VoteGranted = false        }        if rf.lastSnapshotIndex != -1 && rf.lastSnapshotTerm == args.LastLogTerm && args.LastLogIndex < rf.lastSnapshotIndex {                reply.VoteGranted = false        }        if reply.VoteGranted {                //更新leaderid和term                rf.votedFor = args.CandidateId //投票候选人                rf.electionTimer.Reset(time.Duration(rf.electionTimeout+rand.Intn(rf.electionTimeout)) * time.Millisecond)        }        rf.currentTerm = args.Term        rf.state = 0        reply.Term = args.Term        /*****moyoj-3C******/        rf.persist(rf.persister.ReadSnapshot())}func (rf *Raft) InstallSnapshot(request *InstallSnapshotRequest, response *InstallSnapshotResponse) {        rf.statusAccessMutex.Lock()        defer rf.statusAccessMutex.Unlock()        if request.Term < rf.currentTerm {                response.Term = rf.currentTerm                return        }        curindex := request.LastIncludeIndex - rf.lastSnapshotIndex - 1        if curindex < 0 {                response.Term = rf.currentTerm                return        }        if curindex < len(rf.logs) {                if rf.logs.Term != request.Term {                        rf.logs = make([]logEntry, 0)                } else {                        logs := rf.logs                        rf.logs = make([]logEntry, len(rf.logs)-curindex-1)                        copy(rf.logs, logs)                }        } else {                rf.logs = make([]logEntry, 0)        }        rf.lastSnapshotIndex = request.LastIncludeIndex        rf.lastSnapshotTerm = request.LastIncludeTerm        rf.lastApplied = request.LastIncludeIndex        rf.commitIndex = request.LastIncludeIndex        rf.electionTimer.Reset(time.Duration(rf.electionTimeout+rand.Intn(rf.electionTimeout)) * time.Millisecond)        rf.currentTerm = request.Term        rf.state = 0        rf.leaderId = request.LeaderId        response.Term = request.Term        rf.persist(request.Data)        go func() {                var sendApply ApplyMsg                sendApply.CommandValid = false                sendApply.Snapshot = request.Data                sendApply.SnapshotIndex = rf.lastSnapshotIndex + 1                sendApply.SnapshotTerm = rf.lastSnapshotTerm                sendApply.SnapshotValid = true                rf.applyChan
页: [1]
查看完整版本: MIT6.5840 2024 Spring Lab3