LevelDB 学习笔记-Log
Log 在 LevelDB 中发挥着重要的作用,它主要有两个使用场景
- wal,在写 key/value 之前,先向 wal 中写入,用于 crash 后恢复内存数据
- manifest,记录 lsm 的一些元信息,包括层级信息等,用于重启后恢复 db 的一些重要信息
LevelDB 将对 Log 的读写操作分别封装为了 Writer
, Reader
两个类,分别在 log_writer.cc
和 log_reader.cc
中。这两个类并不知道数据长什么样子(可能是单 key 写入,也可能是 WriteBatch批量写入,也可能存储的 VersionEdit),只是单纯地将数据当做字节数组来处理。对于上层调用者来说,他们不需要知道数据底层是怎么存储的(比如一条数据可能跨 block 读取),只需要调用 log 的方法每次写入或者读取一条数据即可。
Log 结构
log 是由一个一个 block 组成的,根据一次写入的数据在 block 的位置,可以将数据分为三种类型:
- 数据完整地放在一个 block 中,可以标记为
kFullType
- 数据跨越多个 block。为了完整地读取出一条记录,LevelDB 将这种数据又划分为了多种类型
- 数据的第一部分标记为
kFirstType
- 数据的最后一部分标记为
kLastType
- 数据的中间部分标记为
kMiddleType
- 数据的第一部分标记为
对此,LevelDB 提供了一个枚举类型
enum RecordType {
// Zero is reserved for preallocated files
kZeroType = 0,
kFullType = 1,
// For fragments
kFirstType = 2,
kMiddleType = 3,
kLastType = 4
};
Writer
Writer
对 WritableFile
做了封装,而 WritableFile
是一个抽象类,表示一个顺序写入的文件。与之类似,Reader
是对 SequentialFile
的封装,表示一个顺序读取的文件。这两个类都定义在include/leveldb/env.h
中,表示实现细节与平台相关,使用者无需关注具体的实现细节
Writer
对外只提供了几个构造函数和一个 AddRecord()
方法,用户可以通过这个方法向 log 中添加数据。
dest_
: 内部的文件的抽象,只提供了写入相关的操作block_offset
:当前写入 block 的偏移量type_crc_
:体现计算好的 crc,添加数据时,只需要在此基础上拓展
在写入 block 前,先会判断当前 block 剩余的空间是否足够放下数据的 header,如果连 header(固定字节) 都放不下,就会在末尾填充 0,切换到下一个 block 写入。同时根据剩余空间是否能放下整条数据,它会将数据的 type 字段标记为相应的类型
Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data();
size_t left = slice.size();
// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do {
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < kHeaderSize) {
// Switch to a new block
if (leftover > 0) { // 剩余部分填充 0
// Fill the trailer (literal below relies on kHeaderSize being 7)
static_assert(kHeaderSize == 7, "");
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
block_offset_ = 0;
}
// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;
RecordType type;
// 所需容量大于等于block 剩余容量时(当前 block 放不下)才为 false
const bool end = (left == fragment_length);
if (begin && end) { // record 可以完整地放在一个 block 中
type = kFullType;
} else if (begin) { // 跨 block 的第一部分
type = kFirstType;
} else if (end) { // 跨 block 的最后一部分
type = kLastType;
} else { // 跨 block 的中间部分
type = kMiddleType;
}
s = EmitPhysicalRecord(type, ptr, fragment_length); // 写入 block 数据
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
return s;
}
数据的真正写入是通过 EmitPhysicalRecord
方法进行的。写入时,会对数据和数据的类型计算 crc32 校验码,这样在进行恢复时可以通过比对 crc 校验码就可以知道数据是否有损坏。为了读取数据并校验,写入时也会将数据对应的类型和长度写入。因此,一条写入 block 的记录的格式如下所示
// +-------------------------+
// | crc | len | type | data |
// +-------------------------+
数据的写入操作是就是单纯的将字节数据写入到文件中,所以这里就不展示代码了
Reader
与 Writer
类似,Reader
对外提供了 ReadRecord()
方法,用于读取一条记录。由于一条记录可能分布在多个 block 上,所以读取时,可能会读取多个 block,并将数据拼接到一起返回。
bool Reader::ReadRecord(Slice* record, std::string* scratch) {
bool in_fragmented_record = false;
// Record offset of the logical record that we're reading
// 0 is a dummy value to make compilers happy
uint64_t prospective_record_offset = 0;
Slice fragment;
while (true) { // 循环读取,因为数据可能跨 block 存储
const unsigned int record_type = ReadPhysicalRecord(&fragment);
// ReadPhysicalRecord may have only had an empty trailer remaining in its
// internal buffer. Calculate the offset of the next physical record now
// that it has returned, properly accounting for its header size.
uint64_t physical_record_offset =
end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();
if (resyncing_) {
if (record_type == kMiddleType) {
continue;
} else if (record_type == kLastType) {
resyncing_ = false;
continue;
} else {
resyncing_ = false;
}
}
switch (record_type) {
case kFullType: // 说明数据完整地存储在当前 block 中,只需要读取一次就可以返回了
prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
return true;
case kFirstType: // 数据的第一部分,将数据 append 到 scratch 即可
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
break;
case kMiddleType: // 数据的中间部分,同样是 append
scratch->append(fragment.data(), fragment.size());
break;
case kLastType: // 数据的最后一部分,拼接为完整地一条数据后就可以返回了
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
return true;
break;
case kEof:
if (in_fragmented_record) {
scratch->clear();
}
return false;
case kBadRecord: // 数据错误,忽略
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear(); // 清空,因为该数据出错了
}
break;
default: { // 未知的数据类型
scratch->clear();
break;
}
}
}
return false;
}
数据的真正读取是通过 ReadPhysicalRecord
放发实现的,这个方法会根据数据的存储格式,从文件中读取一条记录,然后重新计算 crc32 校验码,并与之前计算的结果进行比对。如果结果不相同,就是一个 BadRecord,Reader
会忽略这条数据,并将之前拼接的与该记录相关的数据情况
Log 的应用
Log 有三种应用场景:
- 写入 MemTable 先写 wal
- 修改系统元信息前先写 manifest
- 系统启动时从日志文件中恢复
写 WAL
每次向 MemTable 写数据前都需要写 WAL 日志,这是为了在系统 crash 后仍能够恢复内存中的状态
{
mutex_.Unlock();
// 先写 log
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
// 如果开启了同步选项,将文件刷盘
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
// 插入 memTable
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
写 MANIFEST
写 MANIFEST 的时机有很多,不过基本都时在 Compaction 的时候,包括 Minor Compaction 和 Majority Compaction,这里展示BackgroundCompaction
中与 MANIFEST 相关的代码的
// 优先压缩 memTable
if (imm_ != nullptr) {
CompactMemTable(); // 该方法中也会写 MANIFEST
return;
}
if (c == nullptr) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level 可以直接移到下一层(只有一个文件的情况)
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
}
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs();
RemoveObsoleteFiles();
}
主要方法在 edit
相关的语句和 LogAndApply
当中。首先,会将需要删除和添加的文件记录到 VersionEdit
中,然后需要将 VersionEdit
通过 LogAndApply
写入到 manifest 中。
Recovery
当 DB 重启的时候,需要根据 log 文件恢复关闭前的状态(包括 wal, manifest)。当 log 中的文件恢复完后,会删除没有用的文件,避免下次重启时造成数据库前后状态不一致的后果(主要是删除已经恢复的 wal log 文件和已经没用的 sst 文件)。
log 的恢复是在打开 DB 的使用通过调用 DBImpl::Recover
的时候完成的,而 VersionSet
的恢复是通过 VersionSet::Recover
来完成的,wal 的恢复是通过 DBImpl::RecoverLogFile
来完成的。
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
mutex_.AssertHeld();
env_->CreateDir(dbname_);
assert(db_lock_ == nullptr);
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
// 从 manifest 中恢复
s = versions_->Recover(save_manifest);
if (!s.ok()) {
return s;
}
SequenceNumber max_sequence(0);
// 从 log 文件中恢复
// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor).
//
// Note that PrevLogNumber() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of leveldb.
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
std::set<uint64_t> expected;
versions_->AddLiveFiles(&expected); // 获取系统中目前存活的文件,即当前时间点有用的文件
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
if (!expected.empty()) { // expected 不为空,说明有部分文件丢失
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}
// Recover in the order in which the logs were generated
// 因为在恢复过程中可能会 crash,所以 wal log 可能会有多个
// 对 (wal)log 文件进行排序,因为序号越小,说明越早生成,就越应该先恢复
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok()) {
return s;
}
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}
return Status::OK();
}
wal log 的恢复是通过 RecoverLogFile
完成的,其过程很简单,创建了一个 Reader,并不断读取记录,写入到 WriteBatch
中,并向 MemTable
中插入数据,如果 MemTable 的容量超过限制,会写入到 L0 层。需要注意的是,这个过程是不需要写入 log 的。
对于 manifest 文件的恢复,从 Reader 中读取记录后,需要解码为 VersionEdit
的形式,重新 Apply