LevelDB源码解析1——open
LevelDB的open操作
leveldb源码分析1
在简单的了解了leveldb的概念后和简单使用后,现在开始真正开始分析源码,
首先从Open操作开始。
leveldb::DB
db.h41~44行:
// A DB is a persistent ordered map from keys to values.
// A DB is safe for concurrent access from multiple threads without
// any external synchronization.
class DB {
...
private:
// No copying allowed
DB(const DB&); // 不允许拷贝
void operator=(const DB&);
};
注释也说明了一个DB实例是一个持久的有序的键值对集合,而且是线程安全的。
leveldb::DB::Open
db.h45~53行:
public:
// Open the database with the specified "name".
// Stores a pointer to a heap-allocated database in *dbptr and returns
// OK on success.
// Stores NULL in *dbptr and returns a non-OK status on error.
// Caller should delete *dbptr when it is no longer needed.
// 注意这个函数是静态成员函数
static Status Open(const Options& options,
const std::string& name,
DB** dbptr);
这里有一个比较重要的点就是Open函数是一个静态成员函数,用来构造出一个DB实例。
Options(include/leveldb/option.h)是leveldb启动时的一些配置,get/put/delete 时,也有相应的ReadOption/WriteOptionname就是数据库的名字,一般为路径名称*dbptr是数据库的指针,Open函数就是初始化这个指针然后通过指针返回
再回顾一下上一篇文章中的使用:
leveldb::DB *db = nullptr;
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db);
接下来进入正题,Open函数的实现:
db_impl.cc1501~1505行
Status DB::Open(const Options& options, const std::string& dbname,
DB** dbptr) {
*dbptr = nullptr;
DBImpl* impl = new DBImpl(options, dbname);
new函数的那行跳转到db_impl.cc124~149行:
DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
: env_(raw_options.env), // 负责所有IO,比如建立文件
internal_comparator_(raw_options.comparator), // 用来比较不同key的大小
internal_filter_policy_(raw_options.filter_policy), // 可自定义BloomFilter
options_(SanitizeOptions(dbname, &internal_comparator_,
&internal_filter_policy_, raw_options)),
owns_info_log_(options_.info_log != raw_options.info_log),
owns_cache_(options_.block_cache != raw_options.block_cache),
dbname_(dbname),
table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
db_lock_(nullptr), // 文件锁
shutting_down_(nullptr),
background_work_finished_signal_(&mutex_),
mem_(nullptr), // memtable
imm_(nullptr), // immemtable
logfile_(nullptr),
logfile_number_(0),
log_(nullptr),
seed_(0),
tmp_batch_(new WriteBatch), // 所有的Put操作都是通过batch写入,这里建立一个临时的
background_compaction_scheduled_(false),
manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {
has_imm_.Release_Store(nullptr);
}
接着继续分析Open函数:
db_impl.cc1506~1510行:
impl->mutex_.Lock();
VersionEdit edit;
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest);
- 第一行就是加锁
Version:将每次compact后的最新数据状态定义为Version,也就是当前db元信息以及每个level上具有最新 数据状态的sstable集合。VersionEdit:compact过程中会有一系列改变当前Version的操作 (FileNumber增加,删除input的sstable,增加输出的sstable…), 为了缩小Version切换的时间点,将这些操作封装成VersionEdit,compact完成时,将 VersionEdit 中的操作一次应用到当前Version即可得到最新状态的Version。- 最后一行
Recover就是从日志文件中恢复数据,每次打开数据库都要这样做,会跳到db_impl.cc279行, 由于函数太长了,代码就不贴出来了,大概流程如下:- 检查是否有
CURRENT文件(env_->FileExists(CurrentFileName(dbname_))) - db 元信息检查 (
VersionSet::recover())- 从
CURRENT文件中读取当前的MANIFEST文件。 - 从
MANIFEST文件中依次读取每个record(VersionEdit::DecodeFrom),检查Comparator是否一致,然后依次replay。 - 检查解析
MANIFEST的最终状态中的基本的信息是否完整(log number,FileNumber,SequnceNumber),将其生效成db当前的状态。此时,整个db的各种元信息(FileNumber,SequnceNumber,各level的文件数目,size,range,下一次compact的start_key等等) 均load完成,db恢复成上一次退出前的状态。
- 从
- 从 log 中恢复上一次可能丢失的数据( RecoverLogFile)
- 遍历
db中的文件,根据已经获得的db元信息LogNumber和PrevLogNumber,找到上一次未处 理的log文件。 - 遍历
log文件中的record(put时WriteBatch的写入),重建memtable。达到memtable阈 值(write_buffer_size),就dump成sstable。期间,用record中的SequnceNumber修正 从MANIFEST中读取的当前SequnceNumber。 - 将最后的
memtabledump成sstable。 - 根据
log文件的FileNumber和遍历record的SequnceNumber修正当前的FileNumber和SequnceNumber。
- 遍历
- 检查是否有
现在从日志文件中恢复出数据了,接下来接续回到Open函数(db_impl.cc1511~1542行):
if (s.ok() && impl->mem_ == nullptr) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction();
}
impl->mutex_.Unlock();
if (s.ok()) {
assert(impl->mem_ != nullptr);
*dbptr = impl;
} else {
delete impl;
}
return s;
}
- 生成新的日志文件和对应的
memtable更新db的元信息(VersionSet::LogAndApply(),生成最新的MANIFEST文件), 删除无用文件(DBImpl::DeleteObsoleteFiles()),尝试compact(DBImpl::MaybeScheduleCompaction())。 - 启动完毕。
总结
DB是不可拷贝的,一份数据同事只能有一个db实例操作leveldb::DB::Open()是一个静态成员函数- 每次
Open操作都需要从log文件恢复数据