LevelDB源码解析1——open
LevelDB的open操作
leveldb源码分析1
在简单的了解了leveldb
的概念后和简单使用后,现在开始真正开始分析源码,
首先从Open
操作开始。
leveldb::DB
db.h
41~44行:
// A DB is a persistent ordered map from keys to values.
// A DB is safe for concurrent access from multiple threads without
// any external synchronization.
class DB {
...
private:
// No copying allowed
DB(const DB&); // 不允许拷贝
void operator=(const DB&);
};
注释也说明了一个DB
实例是一个持久的有序的键值对集合,而且是线程安全的。
leveldb::DB::Open
db.h
45~53行:
public:
// Open the database with the specified "name".
// Stores a pointer to a heap-allocated database in *dbptr and returns
// OK on success.
// Stores NULL in *dbptr and returns a non-OK status on error.
// Caller should delete *dbptr when it is no longer needed.
// 注意这个函数是静态成员函数
static Status Open(const Options& options,
const std::string& name,
DB** dbptr);
这里有一个比较重要的点就是Open
函数是一个静态成员函数,用来构造出一个DB
实例。
Options
(include/leveldb/option.h)是leveldb启动时的一些配置,get/put/delete 时,也有相应的ReadOption/WriteOptionname
就是数据库的名字,一般为路径名称*dbptr
是数据库的指针,Open
函数就是初始化这个指针然后通过指针返回
再回顾一下上一篇文章中的使用:
leveldb::DB *db = nullptr;
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db);
接下来进入正题,Open
函数的实现:
db_impl.cc
1501~1505行
Status DB::Open(const Options& options, const std::string& dbname,
DB** dbptr) {
*dbptr = nullptr;
DBImpl* impl = new DBImpl(options, dbname);
new
函数的那行跳转到db_impl.cc
124~149行:
DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
: env_(raw_options.env), // 负责所有IO,比如建立文件
internal_comparator_(raw_options.comparator), // 用来比较不同key的大小
internal_filter_policy_(raw_options.filter_policy), // 可自定义BloomFilter
options_(SanitizeOptions(dbname, &internal_comparator_,
&internal_filter_policy_, raw_options)),
owns_info_log_(options_.info_log != raw_options.info_log),
owns_cache_(options_.block_cache != raw_options.block_cache),
dbname_(dbname),
table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
db_lock_(nullptr), // 文件锁
shutting_down_(nullptr),
background_work_finished_signal_(&mutex_),
mem_(nullptr), // memtable
imm_(nullptr), // immemtable
logfile_(nullptr),
logfile_number_(0),
log_(nullptr),
seed_(0),
tmp_batch_(new WriteBatch), // 所有的Put操作都是通过batch写入,这里建立一个临时的
background_compaction_scheduled_(false),
manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {
has_imm_.Release_Store(nullptr);
}
接着继续分析Open
函数:
db_impl.cc
1506~1510行:
impl->mutex_.Lock();
VersionEdit edit;
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest);
- 第一行就是加锁
Version
:将每次compact
后的最新数据状态定义为Version
,也就是当前db
元信息以及每个level
上具有最新 数据状态的sstable
集合。VersionEdit
:compact
过程中会有一系列改变当前Version
的操作 (FileNumber
增加,删除input
的sstable
,增加输出的sstable
…), 为了缩小Version
切换的时间点,将这些操作封装成VersionEdit
,compact
完成时,将 VersionEdit 中的操作一次应用到当前Version
即可得到最新状态的Version
。- 最后一行
Recover
就是从日志文件中恢复数据,每次打开数据库都要这样做,会跳到db_impl.cc
279行, 由于函数太长了,代码就不贴出来了,大概流程如下:- 检查是否有
CURRENT
文件(env_->FileExists(CurrentFileName(dbname_))
) - db 元信息检查 (
VersionSet::recover()
)- 从
CURRENT
文件中读取当前的MANIFEST
文件。 - 从
MANIFEST
文件中依次读取每个record
(VersionEdit::DecodeFrom
),检查Comparator
是否一致,然后依次replay
。 - 检查解析
MANIFEST
的最终状态中的基本的信息是否完整(log number
,FileNumber
,SequnceNumber
),将其生效成db
当前的状态。此时,整个db
的各种元信息(FileNumber
,SequnceNumber
,各level
的文件数目,size
,range
,下一次compact
的start_key
等等) 均load
完成,db
恢复成上一次退出前的状态。
- 从
- 从 log 中恢复上一次可能丢失的数据( RecoverLogFile)
- 遍历
db
中的文件,根据已经获得的db
元信息LogNumber
和PrevLogNumber
,找到上一次未处 理的log
文件。 - 遍历
log
文件中的record
(put
时WriteBatch
的写入),重建memtable
。达到memtable
阈 值(write_buffer_size
),就dump成sstable
。期间,用record
中的SequnceNumber
修正 从MANIFEST
中读取的当前SequnceNumber
。 - 将最后的
memtable
dump成sstable
。 - 根据
log
文件的FileNumber
和遍历record
的SequnceNumber
修正当前的FileNumber
和SequnceNumber
。
- 遍历
- 检查是否有
现在从日志文件中恢复出数据了,接下来接续回到Open
函数(db_impl.cc
1511~1542行):
if (s.ok() && impl->mem_ == nullptr) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction();
}
impl->mutex_.Unlock();
if (s.ok()) {
assert(impl->mem_ != nullptr);
*dbptr = impl;
} else {
delete impl;
}
return s;
}
- 生成新的日志文件和对应的
memtable
更新db
的元信息(VersionSet::LogAndApply()
,生成最新的MANIFEST
文件), 删除无用文件(DBImpl::DeleteObsoleteFiles()
),尝试compact
(DBImpl::MaybeScheduleCompaction()
)。 - 启动完毕。
总结
DB
是不可拷贝的,一份数据同事只能有一个db
实例操作leveldb::DB::Open()
是一个静态成员函数- 每次
Open
操作都需要从log文件恢复数据