LevelDB的open操作

leveldb源码分析1

在简单的了解了leveldb的概念后和简单使用后,现在开始真正开始分析源码, 首先从Open操作开始。

leveldb::DB

db.h41~44行:

// A DB is a persistent ordered map from keys to values.
// A DB is safe for concurrent access from multiple threads without
// any external synchronization.
class DB {
...
 private:
  // No copying allowed
  DB(const DB&); // 不允许拷贝
  void operator=(const DB&);
};

注释也说明了一个DB实例是一个持久的有序的键值对集合,而且是线程安全的。

leveldb::DB::Open

db.h45~53行:

 public:
  // Open the database with the specified "name".
  // Stores a pointer to a heap-allocated database in *dbptr and returns
  // OK on success.
  // Stores NULL in *dbptr and returns a non-OK status on error.
  // Caller should delete *dbptr when it is no longer needed.
  // 注意这个函数是静态成员函数
  static Status Open(const Options& options,
                     const std::string& name,
                     DB** dbptr);

这里有一个比较重要的点就是Open函数是一个静态成员函数,用来构造出一个DB实例。

  • Options(include/leveldb/option.h)是leveldb启动时的一些配置,get/put/delete 时,也有相应的ReadOption/WriteOption
  • name就是数据库的名字,一般为路径名称
  • *dbptr是数据库的指针,Open函数就是初始化这个指针然后通过指针返回

再回顾一下上一篇文章中的使用:

    leveldb::DB *db = nullptr;
    leveldb::Options options;

    options.create_if_missing = true;

    leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db);

接下来进入正题,Open函数的实现: db_impl.cc1501~1505行

Status DB::Open(const Options& options, const std::string& dbname,
                DB** dbptr) {
  *dbptr = nullptr;

  DBImpl* impl = new DBImpl(options, dbname);

new函数的那行跳转到db_impl.cc124~149行:

DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
    : env_(raw_options.env), // 负责所有IO,比如建立文件
      internal_comparator_(raw_options.comparator), // 用来比较不同key的大小
      internal_filter_policy_(raw_options.filter_policy), // 可自定义BloomFilter
      options_(SanitizeOptions(dbname, &internal_comparator_,
                               &internal_filter_policy_, raw_options)),
      owns_info_log_(options_.info_log != raw_options.info_log),
      owns_cache_(options_.block_cache != raw_options.block_cache),
      dbname_(dbname),
      table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
      db_lock_(nullptr), // 文件锁
      shutting_down_(nullptr),
      background_work_finished_signal_(&mutex_),
      mem_(nullptr), // memtable
      imm_(nullptr), // immemtable
      logfile_(nullptr),
      logfile_number_(0),
      log_(nullptr),
      seed_(0),
      tmp_batch_(new WriteBatch), // 所有的Put操作都是通过batch写入,这里建立一个临时的
      background_compaction_scheduled_(false),
      manual_compaction_(nullptr),
      versions_(new VersionSet(dbname_, &options_, table_cache_,
                               &internal_comparator_)) {
  has_imm_.Release_Store(nullptr);
}

接着继续分析Open函数:

db_impl.cc1506~1510行:

  impl->mutex_.Lock();
  VersionEdit edit;
  // Recover handles create_if_missing, error_if_exists
  bool save_manifest = false;
  Status s = impl->Recover(&edit, &save_manifest);
  • 第一行就是加锁
  • Version:将每次compact后的最新数据状态定义为Version,也就是当前db元信息以及每个level上具有最新 数据状态的sstable集合。
  • VersionEditcompact过程中会有一系列改变当前Version的操作 (FileNumber增加,删除inputsstable,增加输出的sstable…), 为了缩小Version切换的时间点,将这些操作封装成VersionEditcompact 完成时,将 VersionEdit 中的操作一次应用到当前Version即可得到最新状态的Version
  • 最后一行Recover就是从日志文件中恢复数据,每次打开数据库都要这样做,会跳到db_impl.cc279行, 由于函数太长了,代码就不贴出来了,大概流程如下:
    1. 检查是否有CURRENT文件(env_->FileExists(CurrentFileName(dbname_)))
    2. db 元信息检查 (VersionSet::recover())
      • CURRENT文件中读取当前的MANIFEST文件。
      • MANIFEST文件中依次读取每个recordVersionEdit::DecodeFrom),检查Comparator是否一致,然后依次replay
      • 检查解析MANIFEST的最终状态中的基本的信息是否完整(log number,FileNumber, SequnceNumber),将其生效成db当前的状态。此时,整个db的各种元信息(FileNumber, SequnceNumber,各level的文件数目,sizerange,下一次compactstart_key等等) 均load完成,db恢复成上一次退出前的状态。
    3. 从 log 中恢复上一次可能丢失的数据( RecoverLogFile)
      • 遍历db中的文件,根据已经获得的db元信息LogNumberPrevLogNumber,找到上一次未处 理的log文件。
      • 遍历log文件中的recordputWriteBatch的写入),重建memtable。达到memtable阈 值(write_buffer_size),就dump成sstable。期间,用record中的SequnceNumber修正 从MANIFEST中读取的当前SequnceNumber
      • 将最后的memtabledump成sstable
      • 根据log文件的FileNumber和遍历recordSequnceNumber修正当前的FileNumberSequnceNumber

现在从日志文件中恢复出数据了,接下来接续回到Open函数(db_impl.cc1511~1542行):

  if (s.ok() && impl->mem_ == nullptr) {
    // Create new log and a corresponding memtable.
    uint64_t new_log_number = impl->versions_->NewFileNumber();
    WritableFile* lfile;
    s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
                                     &lfile);
    if (s.ok()) {
      edit.SetLogNumber(new_log_number);
      impl->logfile_ = lfile;
      impl->logfile_number_ = new_log_number;
      impl->log_ = new log::Writer(lfile);
      impl->mem_ = new MemTable(impl->internal_comparator_);
      impl->mem_->Ref();
    }
  }
  if (s.ok() && save_manifest) {
    edit.SetPrevLogNumber(0);  // No older logs needed after recovery.
    edit.SetLogNumber(impl->logfile_number_);
    s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
  }
  if (s.ok()) {
    impl->DeleteObsoleteFiles();
    impl->MaybeScheduleCompaction();
  }
  impl->mutex_.Unlock();
  if (s.ok()) {
    assert(impl->mem_ != nullptr);
    *dbptr = impl;
  } else {
    delete impl;
  }
  return s;
}
  • 生成新的日志文件和对应的memtable更新db的元信息(VersionSet::LogAndApply(),生成最新的MANIFEST文件), 删除无用文件(DBImpl::DeleteObsoleteFiles()),尝试compactDBImpl::MaybeScheduleCompaction())。
  • 启动完毕。

总结

  • DB是不可拷贝的,一份数据同事只能有一个db实例操作
  • leveldb::DB::Open()是一个静态成员函数
  • 每次Open操作都需要从log文件恢复数据