MIT6.824分布式系统实验二之Raft
分布式一致性算法Raft的实现
前言
MIT6.824这门课本来是从暑假就开始做的,但是由于各种原因,导致进度缓慢,尤其是实验二卡了很久, 在学完CMU的数据库课程后就重新开始实验二,这次学的更深入且认真。
下面是该实验的实验报告。代码
Raft
实验之前一定要仔细阅读《In Search of an Understandable Consensus Algorithm (Extended Version)》,尤其是第5节,实现代码的时候要按照论文的内容。
Raft_part1——领导人选举
在这一部分,有关日志复制的部分暂时不会被实现,留到第二部分
框架
Raft
这个结构体是每一个服务器持有一个的,作为状态机存在- 在这个实验中只有两个RPC,一个是请求投票的
ReqestVote
,另一个是修改日志(附带心跳)的AppendEntries
- 每个服务器的功能都一样,只是状态不同而已,状态只能是跟随者、候选人或者领导者的其中一个
- 每个服务器通过RPC调用
sendXXX
函数来调用其他服务器的XXX
函数
实现步骤
- 首先根据论文完善Raft结构体中必要的成员
type Raft struct {
mu sync.Mutex
peers []*labrpc.ClientEnd
persister *Persister
me int // index into peers[]
// 当前服务器状态
state int
// 候选人获得的票数
voteAcquired int
// 传播心跳的通道
heartbeatCh chan interface{}
// 当候选人赢得了选举就会利用这个通道传送消息
leaderCh chan interface{}
/*
* 全部服务器上面的可持久化状态:
* currentTerm 服务器看到的最近Term(第一次启动的时候为0,后面单调递增)
* votedFor 当前Term收到的投票候选 (如果没有就为null)
*/
currentTerm int
votedFor int
}
每个服务器只可能存在一下三个状态之一:
const (
STATE_FLLOWER = 0
STATE_CANDIDATE = 1
STATE_LEADER = 2
)
- 继续根据论文完善在RPC之间传递和返回的参数一些结构体
- 选举相关
type RequestVoteArgs struct {
// 候选人的term
Term int
// 候选人在节点数组中的下标
CandidateId int
}
type RequestVoteReply struct {
// currnetTerm,用来给候选人更新term
Term int
// true就说明候选人得到了选票
VoteGranted bool
}
- 复制日志相关
type AppendEntriesArgs struct {
// 领导人的term
Term int
// 领导人所在的下标
LeaderId int
}
type AppendEntriesReply struct {
// currentTerm,用来给领导人更新term
Term int
// true,说明和跟随者的日志匹配
Success bool
}
- 这一部分的重头戏,实现Make()方法
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
// 初始化
rf.state = STATE_FLLOWER
rf.votedFor = -1
rf.currentTerm = 0
rf.heartbeatCh = make(chan interface{})
rf.leaderCh = make(chan interface{})
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
// 状态机循环
go func() {
for {
switch rf.state {
case STATE_FLLOWER:
select {
// 接收到心跳或者超时就变为候选者
case <- rf.heartbeatCh:
case <- time.After(time.Duration(rand.Int63() % 333 + 550) * time.Millisecond):
rf.state = STATE_CANDIDATE
}
case STATE_LEADER:
// 如果是领导人就执行复制日志(携带心跳)的操作
rf.broadcastAppendEntries()
time.Sleep(50 * time.Millisecond)
case STATE_CANDIDATE:
// 候选人要准备投票选举
rf.mu.Lock()
rf.currentTerm++
// 先投给自己一票
rf.votedFor = rf.me
rf.voteAcquired = 1
rf.mu.Unlock()
go rf.broadcastRequestVote()
select {
// 超时或者接收到心跳(其他节点先成为领导人)就变为跟随者
case <- time.After(time.Duration(rand.Int63() % 333 + 550) * time.Millisecond):
case <- rf.heartbeatCh:
rf.state = STATE_FLLOWER
// 从leaderCh通道接收到消息说明赢得选举,成为领导人
case <- rf.leaderCh:
rf.state = STATE_LEADER
}
}
}
}()
return rf
}
- 实现在第3步中使用却没实现的两个操作:领导人选举和日志复制
- 领导人选举
下面这个是RPC函数,用来给候选人调用的,然后在每个服务器执行
func (rf *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) {
// Your code here.
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
// 如果候选人的term比自己的还小,则不给该候选人投票
if args.Term < rf.currentTerm {
reply.VoteGranted = false
return
} else if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.state = STATE_FLLOWER
reply.VoteGranted = true
}
}
func (rf *Raft) sendRequestVote(server int, args RequestVoteArgs, reply *RequestVoteReply) bool {
// 调用RPC
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
rf.mu.Lock()
defer rf.mu.Unlock()
if ok {
term := rf.currentTerm
if rf.state != STATE_CANDIDATE {
return ok
} else if args.Term != term {
return ok
}
// 更新自己的term
if reply.Term > term {
rf.currentTerm = reply.Term
rf.state = STATE_FLLOWER
rf.votedFor = -1
}
// 统计选票
if reply.VoteGranted == true {
rf.voteAcquired++
if rf.voteAcquired > len(rf.peers)/2 {
rf.state = STATE_FLLOWER
rf.leaderCh <- true
}
}
}
return ok
}
func (rf *Raft) broadcastRequestVote() {
// 遍历整个Raft服务器数组,执行领导人选举
for i := range rf.peers {
if i != rf.me {
// 初始化要传给RPC调用的函数参数
var args RequestVoteArgs
args.Term = rf.currentTerm
args.CandidateId = rf.me
go func(i int) {
var reply RequestVoteReply
rf.sendRequestVote(i, args, &reply)
}(i)
}
}
}
- 复制日志(在这一部分只实现简单的心跳)
下面这个是RPC函数,用来给候领导人调用的,然后在每个服务器执行
func (rf *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.heartbeatCh <- struct{}{}
}
有了之前实现的投票,类似地实现日志复制
func (rf *Raft) sendAppendEntries(server int, args AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
rf.mu.Lock()
defer rf.mu.Unlock()
if ok {
if rf.state != STATE_LEADER {
return ok
} else if args.Term != rf.currentTerm {
return ok
}
// 更新自己的term
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.state = STATE_FLLOWER
rf.votedFor = -1
return ok
}
}
return ok
}
func (rf *Raft) broadcastAppendEntries() {
for i := range rf.peers {
if i != rf.me && rf.state == STATE_LEADER {
// 参数初始化
var args AppendEntriesArgs
args.Term = rf.currentTerm
args.LeaderId = rf.me
go func(i int, args AppendEntriesArgs) {
var reply AppendEntriesReply
rf.sendAppendEntries(i, args, &reply)
}(i, args)
}
}
}
测试
只能通过前两个测试点
$ go test
go test
Test: initial election ...
... Passed
Test: election after network failure ...
... Passed
Test: basic agreement ...
^Csignal: interrupt
FAIL raft 12.131s
raft_part2——日志的复制
调试错误
- 怎么也通不过第三个测试
basic agreement
? 这个错误找了很久,我以为问题是出在日志复制部分,但是怎么也排查不出来。
最后再重新看整个代码,发现Start()
函数这里没修改,因为这个函数是与’客户‘通信的通道,如果是leader就要添加指令
func (rf *Raft) Start(command interface{}) (int, int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
index := -1
term := rf.currentTerm
isLeader := rf.state == STATE_LEADER
if isLeader == true {
index = rf.getLastIndex() + 1
rf.log = append(rf.log, LogEntry{LogTerm: term, LogCmd: command})
}
return index, term, isLeader
}
修改之后就能通过第三个测试了,但是第五个测试没通过。
- 第五个测试
no agreement if too many followers fail
? 错误提示:shell Test: no agreement if too many followers fail ... --- FAIL: TestFailNoAgree (3.30s) test_test.go:164: 2 committed but no majority
既然是跟commint日志相关,那么范围可以缩小到有关日志提交的部分。发现了个低级错误。。。
在sendAppendEntries
函数中对matchIndex
的更新出了问题,应该是减一,而不是加一。
rf.nextIndex[server] = reply.NextIndex
rf.matchIndex[server] = reply.NextIndex - 1
- 时不时会卡在第五个测试
no agreement if too many followers fail
? 第五个测试是和领导人选举有关,经检查,在RequestVote
RPC中,在投票确认的时候没有将重复投票的情况排除。go if uptoDate == true && rf.votedFor == -1 { rf.grantedvoteCh <- struct{}{} reply.VoteGranted = true rf.state = STATE_FLLOWER rf.votedFor = args.CandidateId }
在之前的基础上加上了rf.voteFor == -1
条件。
之后再测试就不会卡在第五个测试点了。
- 时不时会卡在测试
Figure 8(unreiable)
? - 时不时会卡在测试
leader backs up quickly over incorrect follower logs
? - 时不时会卡在测试
more persistence
? 找了两天,终于发现问题在哪了。 其实上面的测试卡住不动的原因都是因为我初始化的Channel
都是无缓冲的, 也就是说从无缓存的频道中读取消息会阻塞,直到有goroutine向该频道中发送消息; 同理,向无缓存的频道中发送消息也会阻塞,直到有goroutine从频道中读取消息。
有缓存的通道类似一个阻塞队列(采用环形数组实现)。当缓存未满时,向通道中发送消息时不会阻塞, 当缓存满时,发送操作将被阻塞,直到有其他goroutine从中读取消息;相应的,当渠道中消息不为空时, 读取消息不会出现阻塞,当渠道为空时,读取操作会造成阻塞,直到有goroutine向渠道中写入消息。
果然,将通道改为有缓存的通道就不会再测试卡住不动了。
rf.heartbeatCh = make(chan interface{}, 100)
rf.leaderCh = make(chan interface{}, 100)
rf.commitCh = make(chan interface{}, 100)
rf.grantedvoteCh = make(chan interface{}, 100)
测试
$ go test
Test: initial election ...
... Passed
Test: election after network failure ...
... Passed
Test: basic agreement ...
... Passed
Test: agreement despite follower failure ...
... Passed
Test: no agreement if too many followers fail ...
... Passed
Test: concurrent Start()s ...
... Passed
Test: rejoin of partitioned leader ...
... Passed
Test: leader backs up quickly over incorrect follower logs ...
... Passed
Test: RPC counts aren't too high ...
... Passed
Test: basic persistence ...
... Passed
Test: more persistence ...
... Passed
Test: partitioned leader and one follower crash, leader restarts ...
... Passed
Test: Figure 8 ...
... Passed
Test: unreliable agreement ...
... Passed
Test: Figure 8 (unreliable) ...
... Passed
Test: churn ...
... Passed
Test: unreliable churn ...
... Passed
PASS
ok raft 163.110s
总结
在读Raft论文的同时,我也了解了著名的一致性算法Paxos,相比较之下, 可能还是Raft比较好理解(可能是Raft还有动画演示可以看…)。
虽然说Raft算法比较简单,但是也只是相比较于Paxos。真正要实现,就会遇到一大堆不明所以的问题。
磕磕绊绊,也算是完成了Raft算法,下一个实验就要应用这个算法了。