基于Raft算法的具有容错性的键值数据库的实现

前言

因为有了之前lab2的惯性,lab3实现起来较为得心应手。

这次实验是在Raft的基础上,加上了客户端的操作,对应于论文的第7、8节,而服务器端则是一个简单的键值数据库, 服务端底层是用Raft算法,服务端提供RPC给客户端调用,而服务端要与底层Raft的日志进行交互,实现一个具有容错性的键值数据库。

下面是这次实验的实验报告。

基于Raft的键值服务

  • 客户发送Put()Append()Get()RPCs给键值服务器
  • 一个客户可以发送一个RPC给任意一个键值服务器,即使该服务器不是leader也不必重新发送。 如果该操作已经被提交到Raft日志里了,则报告给客户。如果该操作提交失败了,则报告给客户让其重试其他服务器。
  • 服务器只能通过Raft日志来相互交流。

PartA:键值服务

  1. 先完成server.go中的Op结构体。
type Op struct {
	// "Get" "Put" "Append"
	Type 	string
	Key 	string
	Value 	string
	// RaftKV的Id
	Id 		int64
	// 指令Id,用来标识每一条指令,防止指令重复执行
	ReqId  	int
}
  1. 完成server.go中的RaftKV结构体
type RaftKV struct {
	mu      sync.Mutex
	me      int
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg

	maxraftstate int // snapshot if log grows this big

	// Your definitions here.
	// 数据库,存储数据
	db 		map[string]string
	// map[Id]ReqId
	ack		map[int64]int 
	// map[msg.Index]chan Op
	result 	map[int]chan Op
}
  1. server.go里的主循环:StartKVServer()
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *RaftKV {
	// call gob.Register on structures you want
	// Go's RPC library to marshall/unmarshall.
	gob.Register(Op{})

	kv := new(RaftKV)
	kv.me = me
	kv.maxraftstate = maxraftstate

	// Your initialization code here.

	kv.applyCh = make(chan raft.ApplyMsg)
	kv.rf = raft.Make(servers, me, persister, kv.applyCh)

	kv.db = make(map[string]string)
	kv.ack = make(map[int64]int)
	kv.result = make(map[int]chan Op)

	// 由于该函数要快速返回,所以使用goroutine
	go func() {
		for {
			// 从Raft中传来的日志
			msg := <- kv.applyCh
			// .(type)是类型断言,将接口类型的值Command转换成Op
			op := msg.Command.(Op)

			kv.mu.Lock()
			
			v, ok := kv.ack[op.Id]
			// 检查ack[op.Id]中是否已经有值,如果有值,请求的值比之前的ReqId大就说明需要写入db
			if !(ok && v >= op.ReqId) {
				if op.Type == "Put" {
					kv.db[op.Key] = op.Value
				} else if op.Type == "Append" {
					kv.db[op.Key] += op.Value
				}

				// 更新ReqId
				kv.ack[op.Id] = op.ReqId
			}

			ch, ok := kv.result[msg.Index]
			if ok {
				// 如果通道存在就传送请求op
				ch <- op
			} else {
				kv.result[msg.Index] = make(chan Op, 1)
			}
			
			kv.mu.Unlock()
		}
	}()

	return kv
}
  1. command.go里的结构体添加成员
// Put or Append
type PutAppendArgs struct {
	// You'll have to add definitions here.
	Key   string
	Value string
	Op    string // "Put" or "Append"
	// You'll have to add definitions here.
	// Field names must start with capital letters,
	// otherwise RPC will break.
	Id		int64
	ReqID	int	
}

type GetArgs struct {
	Key 	string
	// You'll have to add definitions here.
	Id		int64
	ReqID	int	
}
  1. server.go里提供给client的RPC操作:GetPutAppend

这两个实现都很简单,都是先构造Op对象,然后与将该指令传到底层的Raft,等待主循环中的回应。

// 添加指令到底层Raft的日志
func (kv *RaftKV) AppendEntryToLog(entry Op) bool {
	index, _, isLeader := kv.rf.Start(entry)
	if !isLeader {
		return false
	}

	kv.mu.Lock()
	ch, ok := kv.result[index]
	if !ok {
		ch = make(chan Op, 1)
		kv.result[index] = ch
	}
	kv.mu.Unlock()

	select {
	// 如果相等说明指令成功写入Raft日志且已经提交了
	case op := <-ch:
		return op == entry
	// 超时
	case <- time.After(1000 * time.Millisecond):
		return false
	}
}


func (kv *RaftKV) Get(args *GetArgs, reply *GetReply) {
	// Your code here.
	op := Op{
		Type: 	"Get",
		Key:	args.Key,
		Id:		args.Id,
		ReqId:	args.ReqID,
	}

	ok := kv.AppendEntryToLog(op)
	if ok {
		reply.WrongLeader = false
		reply.Err = OK
		
		// 从db中取值
		reply.Value = kv.db[args.Key]
	} else {
		reply.WrongLeader = true
	}
}

func (kv *RaftKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
	// Your code here.
	op := Op{
		Type: 	args.Op,
		Key:	args.Key,
		Value:	args.Value,
		Id:		args.Id,
		ReqId:	args.ReqID,
	}

	ok := kv.AppendEntryToLog(op)
	if ok {
		reply.WrongLeader = false
		reply.Err = OK
	} else {
		reply.WrongLeader = true
	}
}
  1. client.go

client的实现很简单

type Clerk struct {
	servers []*labrpc.ClientEnd
	// You will have to modify this struct.
	id 		int64
	// 唯一的指令Id
	reqid	int
	// 保护reqid
	mu 		sync.Mutex
}

func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
	ck := new(Clerk)
	ck.servers = servers
	// You'll have to add code here.
	ck.id = nrand()
	ck.reqid = 0
	return ck
}

之后就是GetPutAppend操作,直接构造args然后通过调用server的RPC传入该参数即可。

func (ck *Clerk) Get(key string) string {
	// You will have to modify this function.
	var args GetArgs
	args.Key = key
	args.Id = ck.id

	ck.mu.Lock()
	args.ReqID = ck.reqid
	ck.reqid++
	ck.mu.Unlock()

	for {
		for _, v := range ck.servers {
			var reply GetReply
			ok := v.Call("RaftKV.Get", &args, &reply)
			if ok && reply.WrongLeader == false {
				return reply.Value
			}
		}
	}
}

func (ck *Clerk) PutAppend(key string, value string, op string) {
	// You will have to modify this function.
	var args PutAppendArgs
	args.Key = key
	args.Value = value
	args.Op = op
	args.Id = ck.id

	ck.mu.Lock()
	args.ReqID = ck.reqid
	ck.reqid++
	ck.mu.Unlock()

	for {
		for _, v := range ck.servers {
			var reply PutAppendReply
			ok := v.Call("RaftKV.PutAppend", &args, &reply)
			if ok && reply.WrongLeader == false {
				return 
			}
		}
	}
}

测试

能通过快照之前所有的测试:

$ go test
Test: One client ...
  ... Passed
Test: concurrent clients ...
  ... Passed
Test: unreliable ...
  ... Passed
Test: Concurrent Append to same key, unreliable ...
  ... Passed
Test: Progress in majority ...
  ... Passed
Test: No progress in minority ...
  ... Passed
Test: Completion after heal ...
  ... Passed
Test: many partitions ...
2018/11/19 13:37:31 Warning: client 0 managed to perform only 6 put operations in 1 sec?
  ... Passed
Test: many partitions, many clients ...
  ... Passed
Test: persistence with one client ...
  ... Passed
Test: persistence with concurrent clients ...
  ... Passed
Test: persistence with concurrent clients, unreliable ...
  ... Passed
Test: persistence with concurrent clients and repartitioning servers...
2018/11/19 13:39:23 Warning: client 2 managed to perform only 9 put operations in 1 sec?
  ... Passed
Test: persistence with concurrent clients and repartitioning servers, unreliable...
2018/11/19 13:39:51 Warning: client 0 managed to perform only 8 put operations in 1 sec?
2018/11/19 13:40:12 Warning: client 3 managed to perform only 8 put operations in 1 sec?
  ... Passed

PartB:日志压缩

快照是实现日志压缩的最简单的方式。

调试

  1. InstallSnapshot RPC出现out of range的错误? 由于我没有完全按照论文中安装快照RPC中的参数,论文中是用offset来代表当前日志的偏移量, 而我直接在LogEntry结构体中添加一个成员LogIndex来代替offset

由于之前的代码没有修改完整,所以导致对日志的操作时出现下标溢出。

在对日志操作的各个地方都要修改,用一个baseIndex := rf.log[0].LogIndex来作为每个日志的零点。

  1. 卡死在InstallSnapshot RPC? 这个错误在lab2扔记忆犹新,因此我特意检查了各个Channel,发现applyCh是一个阻塞通道, 果断修改为非阻塞队列。但是依旧卡死。。。

用lab2的测试结果basic agreement就已经fail了。

说明修改了lab2的一些代码中出了问题。经检查,发现了一个低级错误:在Start()中的 初始化日志没有初始化新添加的LogIndex成员,所以在之后baseIndex也会使用错误。

修改完之后就能通过PartB的第一个测试InstallSnapshot RPC

  1. 通不过测试persistence with one client and snapshots

错误说明:

2018/11/21 19:03:22 get wrong value, key 0, wanted:
x 0 0 yx 0 1 yx 0 2 yx 0 3 yx 0 4 yx 0 5 yx 0 6 yx 0 7 yx 0 8 yx 0 9 yx 0 10 yx 0 11 yx 0 12 yx 0 13 yx 0 14 yx 0 15 yx 0 16 yx 0 17 yx 0 18 yx 0 19 yx 0 20 yx 0 21 yx 0 22 yx 0 23 yx 0 24 yx 0 25 yx 0 26 y
, got
x 0 0 yx 0 1 yx 0 2 yx 0 3 yx 0 4 yx 0 5 yx 0 6 yx 0 7 yx 0 8 yx 0 9 yx 0 10 yx 0 11 yx 0 12 yx 0 13 yx 0 14 yx 0 15 yx 0 16 yx 0 17 yx 0 18 yx 0 19 yx 0 20 y
exit status 1

根据测试给出的提示,是日志缺少了。

找了我一整天,终于发现了bug了。。。而且还是很低级的bug: 在readSnapshot()函数中,以下是正确做法:

d.Decode(&LastIncludedIndex)
d.Decode(&LastIncludedTerm)

之前我忘了加&,导致decode失败,之后的操作必然会受影响。

测试

$ go test
Test: One client ...
  ... Passed
Test: concurrent clients ...
  ... Passed
Test: unreliable ...
  ... Passed
Test: Concurrent Append to same key, unreliable ...
  ... Passed
Test: Progress in majority ...
  ... Passed
Test: No progress in minority ...
  ... Passed
Test: Completion after heal ...
  ... Passed
Test: many partitions ...
2018/11/21 21:24:22 Warning: client 0 managed to perform only 7 put operations in 1 sec?
2018/11/21 21:24:30 Warning: client 0 managed to perform only 7 put operations in 1 sec?
  ... Passed
Test: many partitions, many clients ...
2018/11/21 21:24:45 Warning: client 1 managed to perform only 8 put operations in 1 sec?
  ... Passed
Test: persistence with one client ...
  ... Passed
Test: persistence with concurrent clients ...
  ... Passed
Test: persistence with concurrent clients, unreliable ...
  ... Passed
Test: persistence with concurrent clients and repartitioning servers...
  ... Passed
Test: persistence with concurrent clients and repartitioning servers, unreliable...
2018/11/21 21:26:45 Warning: client 1 managed to perform only 7 put operations in 1 sec?
2018/11/21 21:26:45 Warning: client 2 managed to perform only 5 put operations in 1 sec?
2018/11/21 21:26:45 Warning: client 3 managed to perform only 7 put operations in 1 sec?
2018/11/21 21:26:46 Warning: client 4 managed to perform only 2 put operations in 1 sec?
2018/11/21 21:26:54 Warning: client 0 managed to perform only 4 put operations in 1 sec?
2018/11/21 21:26:55 Warning: client 1 managed to perform only 8 put operations in 1 sec?
2018/11/21 21:26:55 Warning: client 2 managed to perform only 7 put operations in 1 sec?
2018/11/21 21:26:56 Warning: client 3 managed to perform only 8 put operations in 1 sec?
2018/11/21 21:26:56 Warning: client 4 managed to perform only 9 put operations in 1 sec?
2018/11/21 21:27:06 Warning: client 3 managed to perform only 9 put operations in 1 sec?
  ... Passed

Test: InstallSnapshot RPC ...
  ... Passed
Test: persistence with one client and snapshots ...
  ... Passed
Test: persistence with several clients and snapshots ...
  ... Passed
Test: persistence with several clients, snapshots, unreliable ...
  ... Passed
Test: persistence with several clients, failures, and snapshots, unreliable ...
  ... Passed
Test: persistence with several clients, failures, and snapshots, unreliable and partitions ...
2018/11/21 21:28:50 Warning: client 1 managed to perform only 9 put operations in 1 sec?
2018/11/21 21:28:51 Warning: client 3 managed to perform only 8 put operations in 1 sec?
2018/11/21 21:28:51 Warning: client 4 managed to perform only 8 put operations in 1 sec?
2018/11/21 21:29:09 Warning: client 0 managed to perform only 8 put operations in 1 sec?
2018/11/21 21:29:11 Warning: client 4 managed to perform only 6 put operations in 1 sec?
  ... Passed
PASS
ok  	kvraft	351.767s

总结

接着实验二做实验三感觉很好。

A部分主要是理解客户端要怎么与服务端交互并且服务端怎么与Raft交互,理清楚之后实现就很简单了。

B部分则要搞清楚日志要怎么删除,之前的有关日志的操作基本都要修改了。

由于犯了两个低级错误,浪费了好多时间,调试的能力还需加强啊。