Log 在 LevelDB 中发挥着重要的作用,它主要有两个使用场景

  • wal,在写 key/value 之前,先向 wal 中写入,用于 crash 后恢复内存数据
  • manifest,记录 lsm 的一些元信息,包括层级信息等,用于重启后恢复 db 的一些重要信息

LevelDB 将对 Log 的读写操作分别封装为了 Writer, Reader 两个类,分别在 log_writer.cclog_reader.cc 中。这两个类并不知道数据长什么样子(可能是单 key 写入,也可能是 WriteBatch批量写入,也可能存储的 VersionEdit),只是单纯地将数据当做字节数组来处理。对于上层调用者来说,他们不需要知道数据底层是怎么存储的(比如一条数据可能跨 block 读取),只需要调用 log 的方法每次写入或者读取一条数据即可。

Log 结构

log 是由一个一个 block 组成的,根据一次写入的数据在 block 的位置,可以将数据分为三种类型:

  • 数据完整地放在一个 block 中,可以标记为 kFullType
  • 数据跨越多个 block。为了完整地读取出一条记录,LevelDB 将这种数据又划分为了多种类型
    • 数据的第一部分标记为 kFirstType
    • 数据的最后一部分标记为 kLastType
    • 数据的中间部分标记为 kMiddleType

对此,LevelDB 提供了一个枚举类型

enum RecordType {
  // Zero is reserved for preallocated files
  kZeroType = 0,
  kFullType = 1,
  // For fragments
  kFirstType = 2,
  kMiddleType = 3,
  kLastType = 4
};

Writer

Writer 对 WritableFile 做了封装,而 WritableFile 是一个抽象类,表示一个顺序写入的文件。与之类似,Reader 是对 SequentialFile 的封装,表示一个顺序读取的文件。这两个类都定义在include/leveldb/env.h 中,表示实现细节与平台相关,使用者无需关注具体的实现细节

Writer 对外只提供了几个构造函数和一个 AddRecord() 方法,用户可以通过这个方法向 log 中添加数据。

classDiagram direction LR Writer o-- WritableFile <<Abstract>> WritableFile class Writer { - dest_ : WritableFile* - block_offset_ : int - type_crc_[kMaxRecordType + 1] : uint32_t - EmitPhysicalRecord(RecordType type, char* ptr, size_t length) Status + Writer(WritableFile* dest) + Writer(WritableFile* dest, uint64_t dest_length) + AddRecord(const Slice& slice) Status }
  • dest_: 内部的文件的抽象,只提供了写入相关的操作
  • block_offset:当前写入 block 的偏移量
  • type_crc_:体现计算好的 crc,添加数据时,只需要在此基础上拓展

在写入 block 前,先会判断当前 block 剩余的空间是否足够放下数据的 header,如果连 header(固定字节) 都放不下,就会在末尾填充 0,切换到下一个 block 写入。同时根据剩余空间是否能放下整条数据,它会将数据的 type 字段标记为相应的类型

Status Writer::AddRecord(const Slice& slice) {
  const char* ptr = slice.data();
  size_t left = slice.size();

  // Fragment the record if necessary and emit it.  Note that if slice
  // is empty, we still want to iterate once to emit a single
  // zero-length record
  Status s;
  bool begin = true;
  do {
    const int leftover = kBlockSize - block_offset_;
    assert(leftover >= 0);
    if (leftover < kHeaderSize) {
      // Switch to a new block
      if (leftover > 0) { // 剩余部分填充 0
        // Fill the trailer (literal below relies on kHeaderSize being 7)
        static_assert(kHeaderSize == 7, "");
        dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
      }
      block_offset_ = 0;
    }

    // Invariant: we never leave < kHeaderSize bytes in a block.
    assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

    const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
    const size_t fragment_length = (left < avail) ? left : avail;

    RecordType type;
    // 所需容量大于等于block 剩余容量时(当前 block 放不下)才为 false
    const bool end = (left == fragment_length);
    if (begin && end) { // record 可以完整地放在一个 block 中
      type = kFullType;
    } else if (begin) { // 跨 block 的第一部分
      type = kFirstType;
    } else if (end) { // 跨 block 的最后一部分
      type = kLastType;
    } else { // 跨 block 的中间部分
      type = kMiddleType;
    }

    s = EmitPhysicalRecord(type, ptr, fragment_length); // 写入 block 数据
    ptr += fragment_length;
    left -= fragment_length;
    begin = false;
  } while (s.ok() && left > 0);
  return s;
}

数据的真正写入是通过 EmitPhysicalRecord 方法进行的。写入时,会对数据和数据的类型计算 crc32 校验码,这样在进行恢复时可以通过比对 crc 校验码就可以知道数据是否有损坏。为了读取数据并校验,写入时也会将数据对应的类型和长度写入。因此,一条写入 block 的记录的格式如下所示

//   +-------------------------+
//   | crc | len | type | data |
//   +-------------------------+

数据的写入操作是就是单纯的将字节数据写入到文件中,所以这里就不展示代码了

Reader

Writer 类似,Reader 对外提供了 ReadRecord() 方法,用于读取一条记录。由于一条记录可能分布在多个 block 上,所以读取时,可能会读取多个 block,并将数据拼接到一起返回。

bool Reader::ReadRecord(Slice* record, std::string* scratch) {
  bool in_fragmented_record = false;
  // Record offset of the logical record that we're reading
  // 0 is a dummy value to make compilers happy
  uint64_t prospective_record_offset = 0;

  Slice fragment;
  while (true) { // 循环读取,因为数据可能跨 block 存储
    const unsigned int record_type = ReadPhysicalRecord(&fragment);

    // ReadPhysicalRecord may have only had an empty trailer remaining in its
    // internal buffer. Calculate the offset of the next physical record now
    // that it has returned, properly accounting for its header size.
    uint64_t physical_record_offset =
        end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();

    if (resyncing_) {
      if (record_type == kMiddleType) {
        continue;
      } else if (record_type == kLastType) {
        resyncing_ = false;
        continue;
      } else {
        resyncing_ = false;
      }
    }

    switch (record_type) {
      case kFullType: // 说明数据完整地存储在当前 block 中,只需要读取一次就可以返回了
        prospective_record_offset = physical_record_offset;
        scratch->clear();
        *record = fragment;
        last_record_offset_ = prospective_record_offset;
        return true;
      case kFirstType: // 数据的第一部分,将数据 append 到 scratch 即可
        prospective_record_offset = physical_record_offset;
        scratch->assign(fragment.data(), fragment.size());
        in_fragmented_record = true;
        break;
      case kMiddleType: // 数据的中间部分,同样是 append
        scratch->append(fragment.data(), fragment.size());
        break;
      case kLastType: // 数据的最后一部分,拼接为完整地一条数据后就可以返回了
        scratch->append(fragment.data(), fragment.size());
        *record = Slice(*scratch);
        last_record_offset_ = prospective_record_offset;
        return true;
        break;
      case kEof:
        if (in_fragmented_record) {
          scratch->clear();
        }
        return false;
      case kBadRecord: // 数据错误,忽略
        if (in_fragmented_record) {
          ReportCorruption(scratch->size(), "error in middle of record");
          in_fragmented_record = false;
          scratch->clear(); // 清空,因为该数据出错了
        }
        break;
      default: { // 未知的数据类型
        scratch->clear();
        break;
      }
    }
  }
  return false;
}

数据的真正读取是通过 ReadPhysicalRecord 放发实现的,这个方法会根据数据的存储格式,从文件中读取一条记录,然后重新计算 crc32 校验码,并与之前计算的结果进行比对。如果结果不相同,就是一个 BadRecord,Reader 会忽略这条数据,并将之前拼接的与该记录相关的数据情况

Log 的应用

Log 有三种应用场景:

  • 写入 MemTable 先写 wal
  • 修改系统元信息前先写 manifest
  • 系统启动时从日志文件中恢复

写 WAL

每次向 MemTable 写数据前都需要写 WAL 日志,这是为了在系统 crash 后仍能够恢复内存中的状态

    {
      mutex_.Unlock();
      // 先写 log
      status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        // 如果开启了同步选项,将文件刷盘
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        // 插入 memTable
        status = WriteBatchInternal::InsertInto(write_batch, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }

写 MANIFEST

写 MANIFEST 的时机有很多,不过基本都时在 Compaction 的时候,包括 Minor Compaction 和 Majority Compaction,这里展示BackgroundCompaction中与 MANIFEST 相关的代码的

  // 优先压缩 memTable
  if (imm_ != nullptr) {
    CompactMemTable(); // 该方法中也会写 MANIFEST
    return;
  }
  if (c == nullptr) {
    // Nothing to do
  } else if (!is_manual && c->IsTrivialMove()) {
    // Move file to next level 可以直接移到下一层(只有一个文件的情况)
    assert(c->num_input_files(0) == 1);
    FileMetaData* f = c->input(0, 0);
    c->edit()->RemoveFile(c->level(), f->number);
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
                       f->largest);
    status = versions_->LogAndApply(c->edit(), &mutex_);
    if (!status.ok()) {
      RecordBackgroundError(status);
    }
  } else {
    CompactionState* compact = new CompactionState(c);
    status = DoCompactionWork(compact);
    if (!status.ok()) {
      RecordBackgroundError(status);
    }
    CleanupCompaction(compact);
    c->ReleaseInputs();
    RemoveObsoleteFiles();
  }

主要方法在 edit 相关的语句和 LogAndApply 当中。首先,会将需要删除和添加的文件记录到 VersionEdit 中,然后需要将 VersionEdit 通过 LogAndApply 写入到 manifest 中。

Recovery

当 DB 重启的时候,需要根据 log 文件恢复关闭前的状态(包括 wal, manifest)。当 log 中的文件恢复完后,会删除没有用的文件,避免下次重启时造成数据库前后状态不一致的后果(主要是删除已经恢复的 wal log 文件和已经没用的 sst 文件)。

log 的恢复是在打开 DB 的使用通过调用 DBImpl::Recover 的时候完成的,而 VersionSet 的恢复是通过 VersionSet::Recover来完成的,wal 的恢复是通过 DBImpl::RecoverLogFile 来完成的。

Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
  mutex_.AssertHeld();

  env_->CreateDir(dbname_);
  assert(db_lock_ == nullptr);
  Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);

  // 从 manifest 中恢复
  s = versions_->Recover(save_manifest);
  if (!s.ok()) {
    return s;
  }
  SequenceNumber max_sequence(0);

  // 从 log 文件中恢复
  // Recover from all newer log files than the ones named in the
  // descriptor (new log files may have been added by the previous
  // incarnation without registering them in the descriptor).
  //
  // Note that PrevLogNumber() is no longer used, but we pay
  // attention to it in case we are recovering a database
  // produced by an older version of leveldb.
  const uint64_t min_log = versions_->LogNumber();
  const uint64_t prev_log = versions_->PrevLogNumber();
  std::vector<std::string> filenames;
  s = env_->GetChildren(dbname_, &filenames);
  if (!s.ok()) {
    return s;
  }
  std::set<uint64_t> expected;
  versions_->AddLiveFiles(&expected); // 获取系统中目前存活的文件,即当前时间点有用的文件
  uint64_t number;
  FileType type;
  std::vector<uint64_t> logs;
  for (size_t i = 0; i < filenames.size(); i++) {
    if (ParseFileName(filenames[i], &number, &type)) {
      expected.erase(number);
      if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
        logs.push_back(number);
    }
  }
  if (!expected.empty()) { // expected 不为空,说明有部分文件丢失
    return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
  }

  // Recover in the order in which the logs were generated
  // 因为在恢复过程中可能会 crash,所以 wal log 可能会有多个
  // 对 (wal)log 文件进行排序,因为序号越小,说明越早生成,就越应该先恢复
  std::sort(logs.begin(), logs.end());
  for (size_t i = 0; i < logs.size(); i++) {
    s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
                       &max_sequence);
    if (!s.ok()) {
      return s;
    }

    // The previous incarnation may not have written any MANIFEST
    // records after allocating this log number.  So we manually
    // update the file number allocation counter in VersionSet.
    versions_->MarkFileNumberUsed(logs[i]);
  }

  if (versions_->LastSequence() < max_sequence) {
    versions_->SetLastSequence(max_sequence);
  }

  return Status::OK();
}

wal log 的恢复是通过 RecoverLogFile 完成的,其过程很简单,创建了一个 Reader,并不断读取记录,写入到 WriteBatch 中,并向 MemTable 中插入数据,如果 MemTable 的容量超过限制,会写入到 L0 层。需要注意的是,这个过程是不需要写入 log 的。

对于 manifest 文件的恢复,从 Reader 中读取记录后,需要解码为 VersionEdit 的形式,重新 Apply