MIT6.824分布式系统实验三之KV-Raft
基于Raft算法的具有容错性的键值数据库的实现
前言
因为有了之前lab2的惯性,lab3实现起来较为得心应手。
这次实验是在Raft的基础上,加上了客户端的操作,对应于论文的第7、8节,而服务器端则是一个简单的键值数据库, 服务端底层是用Raft算法,服务端提供RPC给客户端调用,而服务端要与底层Raft的日志进行交互,实现一个具有容错性的键值数据库。
下面是这次实验的实验报告。
基于Raft的键值服务
- 客户发送
Put()
、Append()
、Get()
RPCs给键值服务器 - 一个客户可以发送一个RPC给任意一个键值服务器,即使该服务器不是
leader
也不必重新发送。 如果该操作已经被提交到Raft日志里了,则报告给客户。如果该操作提交失败了,则报告给客户让其重试其他服务器。 - 服务器只能通过Raft日志来相互交流。
PartA:键值服务
- 先完成
server.go
中的Op
结构体。
type Op struct {
// "Get" "Put" "Append"
Type string
Key string
Value string
// RaftKV的Id
Id int64
// 指令Id,用来标识每一条指令,防止指令重复执行
ReqId int
}
- 完成
server.go
中的RaftKV
结构体
type RaftKV struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
maxraftstate int // snapshot if log grows this big
// Your definitions here.
// 数据库,存储数据
db map[string]string
// map[Id]ReqId
ack map[int64]int
// map[msg.Index]chan Op
result map[int]chan Op
}
server.go
里的主循环:StartKVServer()
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *RaftKV {
// call gob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
gob.Register(Op{})
kv := new(RaftKV)
kv.me = me
kv.maxraftstate = maxraftstate
// Your initialization code here.
kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
kv.db = make(map[string]string)
kv.ack = make(map[int64]int)
kv.result = make(map[int]chan Op)
// 由于该函数要快速返回,所以使用goroutine
go func() {
for {
// 从Raft中传来的日志
msg := <- kv.applyCh
// .(type)是类型断言,将接口类型的值Command转换成Op
op := msg.Command.(Op)
kv.mu.Lock()
v, ok := kv.ack[op.Id]
// 检查ack[op.Id]中是否已经有值,如果有值,请求的值比之前的ReqId大就说明需要写入db
if !(ok && v >= op.ReqId) {
if op.Type == "Put" {
kv.db[op.Key] = op.Value
} else if op.Type == "Append" {
kv.db[op.Key] += op.Value
}
// 更新ReqId
kv.ack[op.Id] = op.ReqId
}
ch, ok := kv.result[msg.Index]
if ok {
// 如果通道存在就传送请求op
ch <- op
} else {
kv.result[msg.Index] = make(chan Op, 1)
}
kv.mu.Unlock()
}
}()
return kv
}
command.go
里的结构体添加成员
// Put or Append
type PutAppendArgs struct {
// You'll have to add definitions here.
Key string
Value string
Op string // "Put" or "Append"
// You'll have to add definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
Id int64
ReqID int
}
type GetArgs struct {
Key string
// You'll have to add definitions here.
Id int64
ReqID int
}
server.go
里提供给client的RPC操作:Get
和PutAppend
这两个实现都很简单,都是先构造Op
对象,然后与将该指令传到底层的Raft
,等待主循环中的回应。
// 添加指令到底层Raft的日志
func (kv *RaftKV) AppendEntryToLog(entry Op) bool {
index, _, isLeader := kv.rf.Start(entry)
if !isLeader {
return false
}
kv.mu.Lock()
ch, ok := kv.result[index]
if !ok {
ch = make(chan Op, 1)
kv.result[index] = ch
}
kv.mu.Unlock()
select {
// 如果相等说明指令成功写入Raft日志且已经提交了
case op := <-ch:
return op == entry
// 超时
case <- time.After(1000 * time.Millisecond):
return false
}
}
func (kv *RaftKV) Get(args *GetArgs, reply *GetReply) {
// Your code here.
op := Op{
Type: "Get",
Key: args.Key,
Id: args.Id,
ReqId: args.ReqID,
}
ok := kv.AppendEntryToLog(op)
if ok {
reply.WrongLeader = false
reply.Err = OK
// 从db中取值
reply.Value = kv.db[args.Key]
} else {
reply.WrongLeader = true
}
}
func (kv *RaftKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
op := Op{
Type: args.Op,
Key: args.Key,
Value: args.Value,
Id: args.Id,
ReqId: args.ReqID,
}
ok := kv.AppendEntryToLog(op)
if ok {
reply.WrongLeader = false
reply.Err = OK
} else {
reply.WrongLeader = true
}
}
client.go
client的实现很简单
type Clerk struct {
servers []*labrpc.ClientEnd
// You will have to modify this struct.
id int64
// 唯一的指令Id
reqid int
// 保护reqid
mu sync.Mutex
}
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
// You'll have to add code here.
ck.id = nrand()
ck.reqid = 0
return ck
}
之后就是Get
和PutAppend
操作,直接构造args
然后通过调用server的RPC传入该参数即可。
func (ck *Clerk) Get(key string) string {
// You will have to modify this function.
var args GetArgs
args.Key = key
args.Id = ck.id
ck.mu.Lock()
args.ReqID = ck.reqid
ck.reqid++
ck.mu.Unlock()
for {
for _, v := range ck.servers {
var reply GetReply
ok := v.Call("RaftKV.Get", &args, &reply)
if ok && reply.WrongLeader == false {
return reply.Value
}
}
}
}
func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
var args PutAppendArgs
args.Key = key
args.Value = value
args.Op = op
args.Id = ck.id
ck.mu.Lock()
args.ReqID = ck.reqid
ck.reqid++
ck.mu.Unlock()
for {
for _, v := range ck.servers {
var reply PutAppendReply
ok := v.Call("RaftKV.PutAppend", &args, &reply)
if ok && reply.WrongLeader == false {
return
}
}
}
}
测试
能通过快照之前所有的测试:
$ go test
Test: One client ...
... Passed
Test: concurrent clients ...
... Passed
Test: unreliable ...
... Passed
Test: Concurrent Append to same key, unreliable ...
... Passed
Test: Progress in majority ...
... Passed
Test: No progress in minority ...
... Passed
Test: Completion after heal ...
... Passed
Test: many partitions ...
2018/11/19 13:37:31 Warning: client 0 managed to perform only 6 put operations in 1 sec?
... Passed
Test: many partitions, many clients ...
... Passed
Test: persistence with one client ...
... Passed
Test: persistence with concurrent clients ...
... Passed
Test: persistence with concurrent clients, unreliable ...
... Passed
Test: persistence with concurrent clients and repartitioning servers...
2018/11/19 13:39:23 Warning: client 2 managed to perform only 9 put operations in 1 sec?
... Passed
Test: persistence with concurrent clients and repartitioning servers, unreliable...
2018/11/19 13:39:51 Warning: client 0 managed to perform only 8 put operations in 1 sec?
2018/11/19 13:40:12 Warning: client 3 managed to perform only 8 put operations in 1 sec?
... Passed
PartB:日志压缩
快照是实现日志压缩的最简单的方式。
调试
InstallSnapshot RPC
出现out of range
的错误? 由于我没有完全按照论文中安装快照RPC中的参数,论文中是用offset
来代表当前日志的偏移量, 而我直接在LogEntry
结构体中添加一个成员LogIndex
来代替offset
。
由于之前的代码没有修改完整,所以导致对日志的操作时出现下标溢出。
在对日志操作的各个地方都要修改,用一个baseIndex := rf.log[0].LogIndex
来作为每个日志的零点。
- 卡死在
InstallSnapshot RPC
? 这个错误在lab2扔记忆犹新,因此我特意检查了各个Channel,发现applyCh
是一个阻塞通道, 果断修改为非阻塞队列。但是依旧卡死。。。
用lab2的测试结果basic agreement
就已经fail了。
说明修改了lab2的一些代码中出了问题。经检查,发现了一个低级错误:在Start()
中的
初始化日志没有初始化新添加的LogIndex
成员,所以在之后baseIndex
也会使用错误。
修改完之后就能通过PartB的第一个测试InstallSnapshot RPC
。
- 通不过测试
persistence with one client and snapshots
错误说明:
2018/11/21 19:03:22 get wrong value, key 0, wanted:
x 0 0 yx 0 1 yx 0 2 yx 0 3 yx 0 4 yx 0 5 yx 0 6 yx 0 7 yx 0 8 yx 0 9 yx 0 10 yx 0 11 yx 0 12 yx 0 13 yx 0 14 yx 0 15 yx 0 16 yx 0 17 yx 0 18 yx 0 19 yx 0 20 yx 0 21 yx 0 22 yx 0 23 yx 0 24 yx 0 25 yx 0 26 y
, got
x 0 0 yx 0 1 yx 0 2 yx 0 3 yx 0 4 yx 0 5 yx 0 6 yx 0 7 yx 0 8 yx 0 9 yx 0 10 yx 0 11 yx 0 12 yx 0 13 yx 0 14 yx 0 15 yx 0 16 yx 0 17 yx 0 18 yx 0 19 yx 0 20 y
exit status 1
根据测试给出的提示,是日志缺少了。
找了我一整天,终于发现了bug了。。。而且还是很低级的bug:
在readSnapshot()
函数中,以下是正确做法:
d.Decode(&LastIncludedIndex)
d.Decode(&LastIncludedTerm)
之前我忘了加&
,导致decode失败,之后的操作必然会受影响。
测试
$ go test
Test: One client ...
... Passed
Test: concurrent clients ...
... Passed
Test: unreliable ...
... Passed
Test: Concurrent Append to same key, unreliable ...
... Passed
Test: Progress in majority ...
... Passed
Test: No progress in minority ...
... Passed
Test: Completion after heal ...
... Passed
Test: many partitions ...
2018/11/21 21:24:22 Warning: client 0 managed to perform only 7 put operations in 1 sec?
2018/11/21 21:24:30 Warning: client 0 managed to perform only 7 put operations in 1 sec?
... Passed
Test: many partitions, many clients ...
2018/11/21 21:24:45 Warning: client 1 managed to perform only 8 put operations in 1 sec?
... Passed
Test: persistence with one client ...
... Passed
Test: persistence with concurrent clients ...
... Passed
Test: persistence with concurrent clients, unreliable ...
... Passed
Test: persistence with concurrent clients and repartitioning servers...
... Passed
Test: persistence with concurrent clients and repartitioning servers, unreliable...
2018/11/21 21:26:45 Warning: client 1 managed to perform only 7 put operations in 1 sec?
2018/11/21 21:26:45 Warning: client 2 managed to perform only 5 put operations in 1 sec?
2018/11/21 21:26:45 Warning: client 3 managed to perform only 7 put operations in 1 sec?
2018/11/21 21:26:46 Warning: client 4 managed to perform only 2 put operations in 1 sec?
2018/11/21 21:26:54 Warning: client 0 managed to perform only 4 put operations in 1 sec?
2018/11/21 21:26:55 Warning: client 1 managed to perform only 8 put operations in 1 sec?
2018/11/21 21:26:55 Warning: client 2 managed to perform only 7 put operations in 1 sec?
2018/11/21 21:26:56 Warning: client 3 managed to perform only 8 put operations in 1 sec?
2018/11/21 21:26:56 Warning: client 4 managed to perform only 9 put operations in 1 sec?
2018/11/21 21:27:06 Warning: client 3 managed to perform only 9 put operations in 1 sec?
... Passed
Test: InstallSnapshot RPC ...
... Passed
Test: persistence with one client and snapshots ...
... Passed
Test: persistence with several clients and snapshots ...
... Passed
Test: persistence with several clients, snapshots, unreliable ...
... Passed
Test: persistence with several clients, failures, and snapshots, unreliable ...
... Passed
Test: persistence with several clients, failures, and snapshots, unreliable and partitions ...
2018/11/21 21:28:50 Warning: client 1 managed to perform only 9 put operations in 1 sec?
2018/11/21 21:28:51 Warning: client 3 managed to perform only 8 put operations in 1 sec?
2018/11/21 21:28:51 Warning: client 4 managed to perform only 8 put operations in 1 sec?
2018/11/21 21:29:09 Warning: client 0 managed to perform only 8 put operations in 1 sec?
2018/11/21 21:29:11 Warning: client 4 managed to perform only 6 put operations in 1 sec?
... Passed
PASS
ok kvraft 351.767s
总结
接着实验二做实验三感觉很好。
A部分主要是理解客户端要怎么与服务端交互并且服务端怎么与Raft交互,理清楚之后实现就很简单了。
B部分则要搞清楚日志要怎么删除,之前的有关日志的操作基本都要修改了。
由于犯了两个低级错误,浪费了好多时间,调试的能力还需加强啊。