分布式一致性算法Raft的实现

前言

MIT6.824这门课本来是从暑假就开始做的,但是由于各种原因,导致进度缓慢,尤其是实验二卡了很久, 在学完CMU的数据库课程后就重新开始实验二,这次学的更深入且认真。

下面是该实验的实验报告。代码

Raft

实验之前一定要仔细阅读《In Search of an Understandable Consensus Algorithm (Extended Version)》,尤其是第5节,实现代码的时候要按照论文的内容。

Raft_part1——领导人选举

在这一部分,有关日志复制的部分暂时不会被实现,留到第二部分

框架

  • Raft这个结构体是每一个服务器持有一个的,作为状态机存在
  • 在这个实验中只有两个RPC,一个是请求投票的ReqestVote,另一个是修改日志(附带心跳)的AppendEntries
  • 每个服务器的功能都一样,只是状态不同而已,状态只能是跟随者、候选人或者领导者的其中一个
  • 每个服务器通过RPC调用sendXXX函数来调用其他服务器的XXX函数

实现步骤

  1. 首先根据论文完善Raft结构体中必要的成员
type Raft struct {
	mu        sync.Mutex
	peers     []*labrpc.ClientEnd
	persister *Persister
	me        int // index into peers[]

	// 当前服务器状态
	state int
	// 候选人获得的票数
	voteAcquired int
	// 传播心跳的通道
	heartbeatCh chan interface{}
	// 当候选人赢得了选举就会利用这个通道传送消息
	leaderCh chan interface{}

	/*
	 * 全部服务器上面的可持久化状态:
	 *  currentTerm 	服务器看到的最近Term(第一次启动的时候为0,后面单调递增)
	 *  votedFor     	当前Term收到的投票候选 (如果没有就为null)
	 */
	currentTerm int
	votedFor int
}

每个服务器只可能存在一下三个状态之一:

const (
	STATE_FLLOWER = 0
	STATE_CANDIDATE = 1
	STATE_LEADER = 2
)
  1. 继续根据论文完善在RPC之间传递和返回的参数一些结构体
  2. 选举相关
type RequestVoteArgs struct {
	// 候选人的term
	Term int
	// 候选人在节点数组中的下标
	CandidateId int
}

type RequestVoteReply struct {
	// currnetTerm,用来给候选人更新term
	Term int
	// true就说明候选人得到了选票
	VoteGranted bool
}
  • 复制日志相关
type AppendEntriesArgs struct {
	// 领导人的term
	Term int
	// 领导人所在的下标
	LeaderId int
}

type AppendEntriesReply struct {
	// currentTerm,用来给领导人更新term
	Term int
	// true,说明和跟随者的日志匹配
	Success bool
}
  1. 这一部分的重头戏,实现Make()方法
func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me

	// 初始化
	rf.state = STATE_FLLOWER
	rf.votedFor = -1
	rf.currentTerm = 0
	rf.heartbeatCh = make(chan interface{})
	rf.leaderCh = make(chan interface{})

	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())

	// 状态机循环
	go func() {
		for {
			switch rf.state {
			case STATE_FLLOWER:
				select {
				// 接收到心跳或者超时就变为候选者
				case <- rf.heartbeatCh:
				case <- time.After(time.Duration(rand.Int63() % 333 + 550) * time.Millisecond):
					rf.state = STATE_CANDIDATE
				}	

			case STATE_LEADER:
				// 如果是领导人就执行复制日志(携带心跳)的操作
				rf.broadcastAppendEntries()
				time.Sleep(50 * time.Millisecond)

			case STATE_CANDIDATE:
				// 候选人要准备投票选举
				rf.mu.Lock()
				rf.currentTerm++
				// 先投给自己一票
				rf.votedFor = rf.me
				rf.voteAcquired = 1
				rf.mu.Unlock()
				go rf.broadcastRequestVote()
				select {
				// 超时或者接收到心跳(其他节点先成为领导人)就变为跟随者
				case <- time.After(time.Duration(rand.Int63() % 333 + 550) * time.Millisecond):
				case <- rf.heartbeatCh:
					rf.state = STATE_FLLOWER	
				// 从leaderCh通道接收到消息说明赢得选举,成为领导人
				case <- rf.leaderCh:
					rf.state = STATE_LEADER
				}	
			}
		}
	}()

	return rf
}
  1. 实现在第3步中使用却没实现的两个操作:领导人选举和日志复制
  2. 领导人选举

下面这个是RPC函数,用来给候选人调用的,然后在每个服务器执行

func (rf *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here.
	rf.mu.Lock()
	defer rf.mu.Unlock()

	reply.Term = rf.currentTerm

	// 如果候选人的term比自己的还小,则不给该候选人投票
	if args.Term < rf.currentTerm {
		reply.VoteGranted = false
		return
	} else if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		rf.state = STATE_FLLOWER
		reply.VoteGranted = true
	}
}
func (rf *Raft) sendRequestVote(server int, args RequestVoteArgs, reply *RequestVoteReply) bool {
	// 调用RPC
	ok := rf.peers[server].Call("Raft.RequestVote", args, reply)

	rf.mu.Lock()
	defer rf.mu.Unlock()

	if ok {
		term := rf.currentTerm
		if rf.state != STATE_CANDIDATE {
			return ok
		} else if args.Term != term {
			return ok
		}

		// 更新自己的term
		if reply.Term > term {
			rf.currentTerm = reply.Term
			rf.state = STATE_FLLOWER
			rf.votedFor = -1
		}

		// 统计选票
		if reply.VoteGranted == true {
			rf.voteAcquired++
			if rf.voteAcquired > len(rf.peers)/2 {
				rf.state = STATE_FLLOWER
				rf.leaderCh <- true
			}
		}
	}

	return ok
}

func (rf *Raft) broadcastRequestVote() {
	// 遍历整个Raft服务器数组,执行领导人选举
	for i := range rf.peers {
		if i != rf.me {	
			// 初始化要传给RPC调用的函数参数
			var args RequestVoteArgs
			args.Term = rf.currentTerm
			args.CandidateId = rf.me

			go func(i int) {
				var reply RequestVoteReply
				rf.sendRequestVote(i, args, &reply)
			}(i)
		}
	}
}
  • 复制日志(在这一部分只实现简单的心跳)

下面这个是RPC函数,用来给候领导人调用的,然后在每个服务器执行

func (rf *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	rf.heartbeatCh <- struct{}{}
}

有了之前实现的投票,类似地实现日志复制

func (rf *Raft) sendAppendEntries(server int, args AppendEntriesArgs, reply *AppendEntriesReply) bool {
	ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
	rf.mu.Lock()
	defer rf.mu.Unlock()

	if ok {
		if rf.state != STATE_LEADER {
			return ok
		} else if args.Term != rf.currentTerm {
			return ok
		}

		// 更新自己的term
		if reply.Term > rf.currentTerm {
			rf.currentTerm = reply.Term
			rf.state = STATE_FLLOWER
			rf.votedFor = -1
			return ok
		}
	}

	return ok
}

func (rf *Raft) broadcastAppendEntries() {
	for i := range rf.peers {
		if i != rf.me && rf.state == STATE_LEADER {
			// 参数初始化
			var args AppendEntriesArgs
			args.Term = rf.currentTerm
			args.LeaderId = rf.me

			go func(i int, args AppendEntriesArgs) {
				var reply AppendEntriesReply
				rf.sendAppendEntries(i, args, &reply)
			}(i, args)
		}
	}
}

测试

只能通过前两个测试点

$ go test
go test
Test: initial election ...
  ... Passed
Test: election after network failure ...
  ... Passed
Test: basic agreement ...
^Csignal: interrupt
FAIL	raft	12.131s

raft_part2——日志的复制

调试错误

  1. 怎么也通不过第三个测试basic agreement? 这个错误找了很久,我以为问题是出在日志复制部分,但是怎么也排查不出来。

最后再重新看整个代码,发现Start()函数这里没修改,因为这个函数是与’客户‘通信的通道,如果是leader就要添加指令

func (rf *Raft) Start(command interface{}) (int, int, bool) {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	index := -1
	term := rf.currentTerm
	isLeader := rf.state == STATE_LEADER

	if isLeader == true {
		index = rf.getLastIndex() + 1
		rf.log = append(rf.log, LogEntry{LogTerm: term, LogCmd: command})
	}
	return index, term, isLeader
}

修改之后就能通过第三个测试了,但是第五个测试没通过。

  1. 第五个测试no agreement if too many followers fail? 错误提示: shell Test: no agreement if too many followers fail ... --- FAIL: TestFailNoAgree (3.30s) test_test.go:164: 2 committed but no majority 既然是跟commint日志相关,那么范围可以缩小到有关日志提交的部分。发现了个低级错误。。。

sendAppendEntries函数中对matchIndex的更新出了问题,应该是减一,而不是加一。

rf.nextIndex[server] = reply.NextIndex
rf.matchIndex[server] = reply.NextIndex - 1

  1. 时不时会卡在第五个测试no agreement if too many followers fail? 第五个测试是和领导人选举有关,经检查,在RequestVoteRPC中,在投票确认的时候没有将重复投票的情况排除。 go if uptoDate == true && rf.votedFor == -1 { rf.grantedvoteCh <- struct{}{} reply.VoteGranted = true rf.state = STATE_FLLOWER rf.votedFor = args.CandidateId } 在之前的基础上加上了rf.voteFor == -1条件。

之后再测试就不会卡在第五个测试点了。

  1. 时不时会卡在测试Figure 8(unreiable)?
  2. 时不时会卡在测试leader backs up quickly over incorrect follower logs
  3. 时不时会卡在测试more persistence? 找了两天,终于发现问题在哪了。 其实上面的测试卡住不动的原因都是因为我初始化的Channel都是无缓冲的, 也就是说从无缓存的频道中读取消息会阻塞,直到有goroutine向该频道中发送消息; 同理,向无缓存的频道中发送消息也会阻塞,直到有goroutine从频道中读取消息。

有缓存的通道类似一个阻塞队列(采用环形数组实现)。当缓存未满时,向通道中发送消息时不会阻塞, 当缓存满时,发送操作将被阻塞,直到有其他goroutine从中读取消息;相应的,当渠道中消息不为空时, 读取消息不会出现阻塞,当渠道为空时,读取操作会造成阻塞,直到有goroutine向渠道中写入消息。

果然,将通道改为有缓存的通道就不会再测试卡住不动了。

rf.heartbeatCh = make(chan interface{}, 100)
rf.leaderCh = make(chan interface{}, 100)
rf.commitCh = make(chan interface{}, 100)
rf.grantedvoteCh = make(chan interface{}, 100)

测试

$ go test
Test: initial election ...
  ... Passed
Test: election after network failure ...
  ... Passed
Test: basic agreement ...
  ... Passed
Test: agreement despite follower failure ...
  ... Passed
Test: no agreement if too many followers fail ...
  ... Passed
Test: concurrent Start()s ...
  ... Passed
Test: rejoin of partitioned leader ...
  ... Passed
Test: leader backs up quickly over incorrect follower logs ...
  ... Passed
Test: RPC counts aren't too high ...
  ... Passed
Test: basic persistence ...
  ... Passed
Test: more persistence ...
  ... Passed
Test: partitioned leader and one follower crash, leader restarts ...
  ... Passed
Test: Figure 8 ...
  ... Passed
Test: unreliable agreement ...
  ... Passed
Test: Figure 8 (unreliable) ...
  ... Passed
Test: churn ...
  ... Passed
Test: unreliable churn ...
  ... Passed
PASS
ok  	raft	163.110s

总结

在读Raft论文的同时,我也了解了著名的一致性算法Paxos,相比较之下, 可能还是Raft比较好理解(可能是Raft还有动画演示可以看…)。

虽然说Raft算法比较简单,但是也只是相比较于Paxos。真正要实现,就会遇到一大堆不明所以的问题。

磕磕绊绊,也算是完成了Raft算法,下一个实验就要应用这个算法了。