顺着 log 的使用,其实可以找到 LSM-Tree 的主线:根据写 WAL 可以找到 MemTable 的写流程,根据写 MANIFEST 可以顺出 Compaction 的流程。这篇文章主要关注 MemTable。

Arean

Arean 是 LevelDB 自己实现的内容管理,其主要是与 SkipList 绑定(实际上也是唯一的用途),而 SkipList 又是与 MemTable 进行绑定的。MemTable 内部是使用引用计数法来管理的,每当使用 MemTable 时引用会加一,结束使用时,引用会减一。当 MemTable 变为 Immutable MemTable 写入 SSTable 时引用会减为 0,这时就会销毁 MemTable,同时也会销毁内部的 SkipList 和 Arean

classDiagram class Arena{ - char* alloc_ptr_ - size_t alloc_bytes_remaining_ - vector~char*~ blocks_ - atomic~size_t~ memory_usage_ + Allocate() char* + AllocateAligned() char* + MemoryUsage() size_t - AllocateFallback(size_t bytes) char* - AllocateNewBlock(size_t block_bytes) char* }

Arean 对外只提供了 AllocateAllocateAligned 两种分配的方法,区别在于是否需要对齐。Arean 内部也是以 block 为单位进行管理的,这一点从 Aren 的成员变量就能看出来。

inline char* Arena::Allocate(size_t bytes) {
  // The semantics of what to return are a bit messy if we allow
  // 0-byte allocations, so we disallow them here (we don't need
  // them for our internal use).
  assert(bytes > 0);
  if (bytes <= alloc_bytes_remaining_) {
    char* result = alloc_ptr_;
    alloc_ptr_ += bytes;
    alloc_bytes_remaining_ -= bytes;
    return result;
  }
  return AllocateFallback(bytes);
}
char* Arena::AllocateFallback(size_t bytes) {
  if (bytes > kBlockSize / 4) {
    // Object is more than a quarter of our block size.  Allocate it separately
    // to avoid wasting too much space in leftover bytes.
    // 大于 1/4 block 单独分配一个 特定大小的 block,避免浪费
    char* result = AllocateNewBlock(bytes);
    return result;
  }

  // We waste the remaining space in the current block.
  alloc_ptr_ = AllocateNewBlock(kBlockSize);
  alloc_bytes_remaining_ = kBlockSize;

  char* result = alloc_ptr_;
  alloc_ptr_ += bytes;
  alloc_bytes_remaining_ -= bytes;
  return result;
}

如果需要分配的字节数量小于 block 中剩余的字节数量,那么就可以直接分配,否则就需要考虑换一个 block 存储了。如果要分配的字节数量大于 block 大小的 1/4,那么就会为其单独分配一个特定大小的 block,原来的 block 仍然可以用于后续的分配操作。这样做的好处是可以将大对象单独放在一个特定大小的 block,还支持分配大于一个 block size 的对象。如果要分配的字节数量小于 block 大小的 1/4,那么说明当前 block 剩余空间不足 1/4,那么就直接换一个 block(原来的 block 不再用于分配内存了)

下面是 SkipList 使用 Arean 的方式

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode(
    const Key& key, int height) {
  char* const node_memory = arena_->AllocateAligned(
      sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
  return new (node_memory) Node(key);
}

SkipList

SkipList 是一个多级链表结构,可以实现 $O(Log{N})$ 复杂度的插入和查询操作,其相较于平衡树,SkipList 的优势在于实现简单。

classDiagram direction LR SkipList *-- Iterator class SkipList { - compare_ : Comparator - arena_ Arena* : const - head_ : Node* const - max_height_ : atomic~int~ - rnd_ : Random + Insert(const Key& key) void + Contains(const Key& key) bool - GetMaxHeight() int - NewNode(const Key& key, int height) Node* - RandomHeight() int - Equal(const Key& a, const Key& b) bool - KeyIsAfterNode(const Key& key, Node* n) bool - FindGreaterOrEqual(const Key& key, Node** prev) Node* - FindLessThan(const Key& key) Node* - FindLast() Node* } class Iterator { - list_ : SkipList* - node_ Node* +Valid() bool +key() Key& +Next() void +Prev() void +Seek(const Key& target) void +SeekToFirst() void +SeekToLast() void }
class SkipList {
 private:
  struct Node;

 public:
  explicit SkipList(Comparator cmp, Arena* arena);
  SkipList(const SkipList&) = delete;
  SkipList& operator=(const SkipList&) = delete;

  // Insert key into the list.
  // REQUIRES: nothing that compares equal to key is currently in the list.
  void Insert(const Key& key);

  // Returns true iff an entry that compares equal to key is in the list.
  bool Contains(const Key& key) const;

  // Iteration over the contents of a skip list
  class Iterator {
   public:
    explicit Iterator(const SkipList* list);
    bool Valid() const;
    const Key& key() const;
    void Next();
    void Prev();
    void Seek(const Key& target);
    void SeekToFirst();
    void SeekToLast();

   private:
    const SkipList* list_;
    Node* node_;
    // Intentionally copyable
  };

 private:
  enum { kMaxHeight = 12 };

  inline int GetMaxHeight() const {
    return max_height_.load(std::memory_order_relaxed);
  }

  Node* NewNode(const Key& key, int height);
  int RandomHeight();
  bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }

  // Return true if key is greater than the data stored in "n"
  bool KeyIsAfterNode(const Key& key, Node* n) const;

  // Return the earliest node that comes at or after key.
  // Return nullptr if there is no such node.
  //
  // If prev is non-null, fills prev[level] with pointer to previous
  // node at "level" for every level in [0..max_height_-1].
  Node* FindGreaterOrEqual(const Key& key, Node** prev) const;

  // Return the latest node with a key < key.
  // Return head_ if there is no such node.
  Node* FindLessThan(const Key& key) const;

  // Return the last node in the list.
  // Return head_ if list is empty.
  Node* FindLast() const;

  Comparator const compare_;
  Arena* const arena_;  // Arena used for allocations of nodes
  Node* const head_;
  std::atomic<int> max_height_;  // Height of the entire list
  Random rnd_;
};

SkipList 对外只提供了插入方法 Insert 和一个迭代器 Iterator,其中插入方法只支持插入 key。在内部,SKipList 实现了 FindGreateOrEqual 等辅助方法,用于帮助实现 SKipList 的功能,这里只列出 FindGreatOrEqual 的实现,因为其与 SKipList 的许多操作密切相关

// 找到第一个大于等于 key 的节点,并将前驱结点存储到 prev 中
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
                                              Node** prev) const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  while (true) {
    Node* next = x->Next(level);
    if (KeyIsAfterNode(key, next)) { // 如果查找 key > next->key,说明要查找的 key 在后面,需要继续向前查找
      // Keep searching in this list  ; key > next->key
      x = next;
    } else { // 否则 key <= next->key,可以将该 node 作为 prev 前驱结点
      // key <= next->key
      if (prev != nullptr) prev[level] = x; // 记录前驱节点,可以用于插入操作
      if (level == 0) { // 如果已经查到第 0 层了,可以直接返回了
        return next;
      } else {
        // Switch to next list
        level--; // 否则查找下一层
      }
    }
  }
}

在 SkipList 的插入和迭代查找操作中,都调用了 FindGreatOrEqual 方法,这得益于 key 在 SkipList 中排序的方式。SkipList 中的 key 是按照 user key 的大小升序排列的,如果 user key 相同,按照 sequence 降序排列。这样 FindGreatOrEqual 方法就能找到最接近某个版本的数据了。下面是查找和插入操作的实现

template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
  node_ = list_->FindGreaterOrEqual(target, nullptr);
}

template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
  // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
  // here since Insert() is externally synchronized.
  Node* prev[kMaxHeight];
  Node* x = FindGreaterOrEqual(key, prev);

  // Our data structure does not allow duplicate insertion
  assert(x == nullptr || !Equal(key, x->key));

  int height = RandomHeight();
  if (height > GetMaxHeight()) {
    // 更新当前最大高度,并将新增高度的前驱结点设置为 head_
    for (int i = GetMaxHeight(); i < height; i++) {
      prev[i] = head_;
    }
    max_height_.store(height, std::memory_order_relaxed);
  }

  // 插入节点
  x = NewNode(key, height);
  for (int i = 0; i < height; i++) {
    // NoBarrier_SetNext() suffices since we will add a barrier when
    // we publish a pointer to "x" in prev[i].
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    prev[i]->SetNext(i, x);
  }
}

MemTable

{% mermaid %} classDiagram class MemTable { - refs* : int - arena* : Arena - table_ : Table + MemTable(InternalKeyComparator comparator) - ~MemTable() + Ref() void + Unref() void + ApproximateMemoryUsage() size_t + NewIterator() Iterator + Add(SequenceNumber seq, ValueType type, Slice& key, Slice& value) void + Get(const LookupKey& key, std::string* value, Status* s) bool } {% endmermaid %}

MemTable 对外提供了 Get()Add() 方法,分别对应于查找和插入元素。由于 SkipList 只支持插入一个 key,所以 MemTable 需要将 key 和 value 合起来作为一个 插入 key 插入到 SkipList 中。key 的结构如下所示

|         Lookup key        |
            |  internal key |
            | user key|
+-----------------------------------------------+
| ikey_size |   key   | tag | value_size | value |
+-----------------------------------------------+
                     /      \
                  +------------+
                  | seq | type |
                  +------------+

Add 方法很简单,就是按照上面的格式将数据组织成 key,然后插入 SkipList 中。需要说明的是 key_size 和 value_size 都是使用 varint32 变长编码存储的。然后就是 tag 的生成方式是采用 seq << 8 | type 的方式生成的,也就是说高 7 位用作序列号,低 1 位用作 type,type 表明这是插入操作还是删除操作

LevelDB 提供了两种写入操作,分别是插入单个 key 和原子地插入多个 key,其实最终都会调用后者的方法,也就是说在插入 MemTable 前只会写一次 WAL 日志,然后再批量插入 MemTable 中。但是所谓的批量插入 MemTable 也只是循环调用了 Add 方法

Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value);
Status Write(const WriteOptions& options, WriteBatch* updates) = 0;

DBImpl::Write

  if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
    WriteBatch* write_batch = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(write_batch);

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      mutex_.Unlock();
      // 先写 log
      status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        // 如果开启了同步选项,将文件刷盘
        status = logfile_->Sync();
      }
      if (status.ok()) {
        // 插入 memTable
        status = WriteBatchInternal::InsertInto(write_batch, mem_);
      }
      mutex_.Lock();
    }
    if (write_batch == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }

从上面的代码可以看出,确实只会写入一次 WAL,然后再调用 WriteBatchInternal::InsertInto 批量插入 MemTable。需要注意的是 Sequence 的修改,它只会将下一个 seq 设置到 WriteBatch 中(WriteBatch 的结构可以查看之前的文章),并更新一次(更新为 last_sequence + count + 1)

WriteBatchInternal::InsertInto 最终会调用 WriteBatch::Iterate 方法,将 WriteBatch 中的 key/value 全部插入到 MemTable 中。

Status WriteBatch::Iterate(Handler* handler) const {
  Slice input(rep_);

  input.remove_prefix(kHeader);
  Slice key, value;
  int found = 0;
  while (!input.empty()) {
    found++;
    char tag = input[0];
    input.remove_prefix(1);
    switch (tag) {
      case kTypeValue:
        if (GetLengthPrefixedSlice(&input, &key) &&
            GetLengthPrefixedSlice(&input, &value)) {
          // 插入 memTable 时,会调用 MemTableInserter::Put(key, value)
          handler->Put(key, value);
        }
        break;
      case kTypeDeletion:
        if (GetLengthPrefixedSlice(&input, &key)) {
          handler->Delete(key);
        }
        break;
      default:
        return Status::Corruption("unknown WriteBatch tag");
    }
  }
  return Status::OK();
}

void Put(const Slice& key, const Slice& value) override {
  mem_->Add(sequence_, kTypeValue, key, value);
  sequence_++;
}
void Delete(const Slice& key) override {
  mem_->Add(sequence_, kTypeDeletion, key, Slice());
  sequence_++;
}

可以看到 WriteBatch::Iterate 确实会遍历 WriteBatch 中的所有 key/value,然后插入 MemTable。需要注意的是,不管是 Put 还是 Delete,都会增加 sequence,这与有些博客说的 WriteBatch 中的数据公用一个序列号的说法不一致

为了方便查找,LevelDB 封装了一个 LookupKey,里面包含需要查找的 user key 和一个序列号,以及 LookupKey 的长度

LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
  size_t usize = user_key.size();
  size_t needed = usize + 13;  // A conservative estimate
  char* dst;
  if (needed <= sizeof(space_)) { // char space_[200] 是为了避免 key 太短了
    dst = space_;
  } else {
    dst = new char[needed];
  }
  start_ = dst;
  dst = EncodeVarint32(dst, usize + 8); // 写入 size
  kstart_ = dst; // 记录 user key 开始的位置
  std::memcpy(dst, user_key.data(), usize);
  dst += usize;
  EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
  dst += 8;
  end_ = dst; // 记录 LookupKey 结束的位置
}

查找的时候,会将 sequence 设置为当前最新的 sequence,由于 SkipList 中查找时会将第一个大于等于查找 key 的元素返回,再结合 key 在 SkipList 中的排列顺序,可以发现如果 key 存在于 SkipList 中,就会返回最新版本的 key。

DBImpl::Get

  SequenceNumber snapshot;
  if (options.snapshot != nullptr) {
    snapshot =
        static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
  } else {
    snapshot = versions_->LastSequence();
  }
  {
    mutex_.Unlock();
    // First look in the memtable, then in the immutable memtable (if any).
    // 先从 memtable 中查找,其次是 immMemtable. 最后从 sst 中查找
    LookupKey lkey(key, snapshot);
    if (mem->Get(lkey, value, &s)) { // 传入的是当前最新的 key
      // Done
    } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
      // Done
    } else {
      s = current->Get(options, lkey, value, &stats);
      have_stat_update = true;
    }
    mutex_.Lock();
  }
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
  Slice memkey = key.memtable_key(); // memKey 是 internal_key_size + user key + tag 部分
  Table::Iterator iter(&table_);
  iter.Seek(memkey.data()); // 查找第一个大于等于 memkey 的元素
  if (iter.Valid()) {
    const char* entry = iter.key();
    uint32_t key_length;
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
    // 再次查看是否是我们需要的 key,因为 key 有可能不存在
    if (comparator_.comparator.user_comparator()->Compare(
            Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
      // Correct user key
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      switch (static_cast<ValueType>(tag & 0xff)) {
        case kTypeValue: {
          Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
          value->assign(v.data(), v.size());
          return true;
        }
        case kTypeDeletion: // key 已经被删除了,所以返回 NotFound
          *s = Status::NotFound(Slice());
          return true;
      }
    }
  }
  return false;
}

可以看到,找到元素后需要再比较一次,因为 SkipList 返回的是第一个大于等于 key 的元素。如果 key 不存在,返回的元素就不是当前元素了,所以需要再比较一次