Skip to content

Latest commit

 

History

History
312 lines (231 loc) · 12.4 KB

Raft.md

File metadata and controls

312 lines (231 loc) · 12.4 KB
title date
Raft--易于理解的一致性算法
2019-11-07 00:00:00 +2300

这篇文章讲解如何用 go 实现 raft 算法,代码框架来自于 Mit6.824分布式课程

最初,为了学习分布式系统,我了解到了 Mit 6.824课程,课程的 lab 需要用 go 来完成。于是 go 走进了我的世界,go 很容易入门, 写起来很舒服,但是要真正的理解 go, 并不是很容易, 特别是对 goroutine, select, channel 等的灵活运用。俗话说的好,初生牛犊不怕虎, 在初步了解 go 之后, 我就开始肝课程了。 每个 lab 都会有对应的论文, 比如 mapreduce, raft 等等。 lab2 是实现 raft 算法, lab3 是基于 lab2的 raft 算法来 实现一个简单的分布式 kv 存储。 在做 lab 的过程中,不仅仅可以对 raft 的细节有更好的把握, 同时对 go 语言的理解也会逐渐加深, 特别是并发部分。

首先看一下复制状态机,如下图所示

复制状态机

复制状态机通常是基于复制日志实现的。每一台服务器存储着一个包含一系列指令的日志,每个日志都按照相同的顺序包含相同的指令,所以每一台服务器都执行相同的指令序列。那么如何保证每台服务器上的日志都相同呢? 这就是接下来要介绍的一致性算法raft要做的事情了。

raft 主要分三大部分, 领导选举日志复制日志压缩。 由于其中的细节很多,所以在实现过程中肯定会遇到各种各样的问题, 这也是一个比较好的事情,因为问题将促使我们不断地深入的去阅读论文, 同时锻炼 debug 并发程序的能力。最后肯定是满满的收获。

实现主要依赖于raft论文中的下图

代码框架条理清楚。主要包含七个主要的 struct , 三个 RPC handler , 四个主干函数 , 如下

// 七个struct
type Raft struct {
    ...
}

type RequestVoteArgs struct {
    ...
}

type RequestVoteReply struct {
    ...
}

type AppendEntriesArgs struct {
    ...
}

type AppendEntriesReply struct {
    ...
}

type InstallSnapShotArgs struct {
    ...
}

type InstallSnapShotReply struct {
    ...
}

// 三个 RPC handler

// RequestVote RPC handler
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    ...
}

// AppendEntries RPC handler
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    ...
}

// InstallSnapShot RPC handler
func (rf *Raft) InstallSnapShot(args* InstallSnapShotArgs, reply* InstallSnapShotReply) {
    ...
}

// 四个主干函数
func (rf* Raft) electForLeader() {
    ...
}

func (rf* Raft) appendLogEntries() {
    ...
}

func (rf* Raft) transmitSnapShot(server int)  {
    ...
}

// 包含一个后台 goroutine. 
func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft {
        ...
        go func() {
            for {
                electionTimeout := time.Duration(rand.Intn(200) + 300) * time.Millisecond
                switch state {
                case "follower", "candidate":
                    // if receive rpc, then break select, reset election tim
                    select {
                    case <-rf.ch:
                    case <-time.After(electionTimeout):
                    //become Candidate if time out
                        rf.changeRole("candidate")
                    }
                case "leader":
                    time.Sleep(heartbeatTime) 
                    rf.appendLogEntries()
                }
            }
        }()
}

下面的部分, 记录了三大部分的主干和我认为的容易出错的地方。

领导选举


1. electForLeader 函数主干,候选人针对每一个 peer 发送请求投票RPC

for i:=0; i<len(rf.peers); i++ {
    // meet myself
    if i==rf.me {
        continue
    }
    go func(index int) {
        reply := &RequestVoteReply{}
        response := rf.sendRequestVote(index, &args, reply)
        if response {
            ...
        }
    }(i)
}

2. 获得响应后,要将 reply 的 term 于 candidate 的 currentTerm 进行比较

if reply.Term > rf.currentTerm {
    rf.currentTerm = reply.Term
    rf.changeRole("follower")
    return
}

3. 获得响应后,候选人要检查自己的 stateterm 是否因为发送RPC而改变

if rf.state != "candidate" || rf.currentTerm!= args.Term { return }

4. 若候选人获得的投票超过半数,则变成领导人

5. 请求投票PRC ⭐(接收者指接收 请求投票PRC 的 peer)

  • 如果 candidate 的 term 小于接收者的 currentTerm, 则不投票,并且返回接收者的 currentTerm

    reply.VoteGranted = false
    reply.Term = rf.currentTerm
    if rf.currentTerm > args.Term { return }
  • 如果接收者的 votedFor 为空或者为 candidateId,并且 candidate 的日志至少和接收者一样新,那么就投票给候选人。candidate 的日志至少和接收者一样新的含义:candidate 的最后一个日志条目的 term 大于接收者的最后一个日志条目的 term 或者当二者相等时,candidate 的最后一个日志条目的 index 要大于等于接收者的

    if (rf.votedFor==-1 || rf.votedFor==args.CandidateId) &&
       (args.LastLogTerm > rf.getLastLogTerm() ||
     ((args.LastLogTerm==rf.getLastLogTerm())&& (args.LastLogIndex>=rf.getLastLogIndex()))) {
           reply.VoteGranted = true
           rf.votedFor = args.CandidateId
           rf.state = "follower"   // rf.state can be follower or candidate
           ...
    }

日志复制

1. appendLogEntries 函数的主干,leader 针对每一个 peer 发送附加日志 RPC

for i:=0; i<len(rf.peers); i++ {
    if i == rf.me {
        continue
    }
    go func(index int) {
        reply := &AppendEntriesReply{}
        respond := rf.sendAppendEntries(index, &args, reply)
        if reply.Success {
            ...
            return
        } else {
            ...
        }
    }(i)
}

2. 获得响应后,要将 reply 的 term 于 leader 的 currentTerm 进行比较

3. 获得响应后,候选人要检查自己的 stateterm 是否因为发送RPC 而改变

4. 回复成功

  • 更新 nextIndex, 即 leader 需要发送给该 peer 下一条日志条目的索引值 , 更新 matchIndex, 即 leader 已经复制给该 peer 的日志的最高索引值

    rf.matchIndex[index] = args.PrevLogIndex + len(args.Entries)
    rf.nextIndex[index] = rf.matchIndex[index] + 1
  • 如果存在一个 N 满足 N>commitIndex,并且大多数 matchIndex[i] > N 成立,并且 log[N].term == currentTerm,则更新 commitIndex=N

5. 回复不成功

  • 更新 nextIndex,然后重试

6. 附加日志RPC

  • 几个再次明确的地方:

    • preLogIndex 的含义:新的日志条目(s)紧随之前的索引值,是针对每一个follower而言的==nextIndex[i]-1,每一轮重试都会改变。

    • entries[] 的含义:准备存储的日志条目;表示心跳时为空

      append(make([]LogEntry, 0), rf.log[rf.nextIndex[index]-rf.LastIncludedIndex:]...)
  • 领导人获得权力后,初始化所有的 nextIndex 值为自己的最后一条日志的 index+1;如果一个 follower 的日志跟领导人的不一样,那么在附加日志 PRC 时的一致性检查就会失败。领导人选举成功后跟随者可能的情况

  • reply增加 ConflictIndexConflictTerm 用于记录日志冲突 index 和 term

  • 如果 leader 的 term 小于接收者的 currentTerm, 则 reply false.

  • 接下来就三种情况 ⭐(比较绕且容易出错)

    1. follower 的日志长度比 leader 的短, 那么 ConflictIndex 就是 follower 的 日志长度。之后在 appendLogEntries 函数里面会更新 nextIndex 为 ConflictIndex, 相应的 prevLogIndex 也会改变, 于是在下一轮RPC中便可以比较 follower 的日志条目在 arg.PrevLogIndex 索引处的 Term 是否等于 args.PrevLogTerm, 如果相等, 则该可以把 leader 在 arg.PrevLogIndex 后面的一次性全部添加到 follower 的日志条目上, 从而达成一致性, 否则令 ConflictTerm 为 follower 的日志条目在 arg.PrevLogIndex 索引处的 Term, 然后从头遍历 follower 的日志条目, 找到第一个 term 等于 ConflictTerm 的 索引, 用来更新 ConflictIndex
    2. follower 的日志长度比 leader 的长,且在 prevLogIndex 处的 term 相等, 则开始依次对比 follower 在 arg.prevLogIndex 后的日志条目在何处跟 entries 上相应的日志的 term 不一致, 假设该索引为 idx, 那么 follower 在索引后的日志条目应该被替换为 entries 在 该索引后的日志条目, 最终达成一致性
    3. follower 的日志长度比 leader 的长,且在 prevLogIndex 处的 term 不相等, 之后便是去寻找 ConflictTerm 然后更新 ConflictIndex, 直到在 prevLogIndex 处的 term 相等, 然后按照 2 进行处理.
    if args.PrevLogIndex >=rf.LastIncludedIndex && args.PrevLogIndex < rf.logLen() {
    	if args.PrevLogTerm != rf.log[args.PrevLogIndex-rf.LastIncludedIndex].Term {
    		reply.ConflictTerm = rf.log[args.PrevLogIndex-rf.LastIncludedIndex].Term
    		//  then search its log for the first index
    		//  whose entry has term equal to conflictTerm.
    		for i:=rf.LastIncludedIndex; i<rf.logLen(); i++ {
    			if rf.log[i-rf.LastIncludedIndex].Term==reply.ConflictTerm {
    				reply.ConflictIndex = i
    				break
    			}
    		}
    		return
    	}
    }else {
    	reply.ConflictIndex = rf.logLen()
    	return
    }
    
    index := args.PrevLogIndex
    for i:=0; i<len(args.Entries); i++ {
    	index++
    	if index >= rf.logLen() {
    		rf.log = append(rf.log, args.Entries[i:]...)
    		rf.persist()
    		break
    	}
    	if rf.log[index-rf.LastIncludedIndex].Term != args.Entries[i].Term {
    		rf.log = rf.log[:index-rf.LastIncludedIndex]
    		rf.log = append(rf.log, args.Entries[i:]...)
    		rf.persist()
    		break
    	}
    }
  • 如果 leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和新日志条目索引值中较小的一个


日志压缩

1. 采用日志压缩,不仅可以减少本地占用的空间和每次重置的时间花销, 还可以直接通过网络把快照发给那些落后的跟随者,使他们更新到最新的状态

2. 下图为 Raft 中快照的基本思想, 快照中包含状态机的状态, LastIncludeIndex: 被快照取代的最后的条目在日志中的索引值, LastIncludedTerm: 该条目的任期号,保留这些数据是为了支持快照后紧接着的第一个条目的附加日志请求时的一致性检查

3. 安装快照RPC

  • 尽管服务器通常都是独立的创建快照,但是领导人必须偶尔的发送快照给一些落后的跟随者

  • 三种情况

    • leader 的 LastIncludedIndex 小于等于 follower 的 LastIncludeIndex
    • leader 的 LastIncludedIndex 大于 follower 的 LastIncludeIndex,leader 的 LastIncludedIndex 小于 follower 日志的最大索引值
    • leader 的 LastIncludedIndex 大于等于 follower 日志的最大索引值

    对应的处理方式

    • 如果接收到的快照是自己日志的前面部分,那么快照包含的条目将全部被删除,但是快照后面的条目仍然有效,要保留
    • 如果快照中包含没有在接收者日志中存在的信息,那么跟随者丢弃其整个日志,全部被快照取代。

总结

1. 个人认为看一两遍论文就掌握 raft 还是比较困难的, 所以当对 raft 有了大致了解之后, 就可以开始实现了, 遇到问题后带着问题去读, 反复咀嚼, 逐渐加深对 raft 的理解

2. 推荐看一下这个: raft 可视化, 可以对 raft 有一个直观的感受