Leveldb 源码详解系列之四: 迭代器设计与实现

迭代器的设计和实现是 leveldb 的精华之一. 前几篇文章都多少提到了迭代器的使用, 本篇让我们深入一下迭代器的设计实现, 也为接下来的几篇剖析打下基础.

迭代器接口类为 leveldb::Iterator, 位于 include/leveldb/iterator.htable/iterator.cc. (实现位于 table 目录, 是因为接下来要介绍的 sstable 是迭代器重度用户.)

迭代器接口定义比较简洁, 主要方法为指向合法性判断, 前后移动, 定位(开头/末尾/任意), 提取数据项 key/value 等等.

唯一的数据成员为清理函数列表头节点.

具体如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class LEVELDB_EXPORT Iterator {
 public:
  Iterator();

  // 禁用复制构造
  Iterator(const Iterator&) = delete;
  // 禁用赋值构造
  Iterator& operator=(const Iterator&) = delete;

  virtual ~Iterator();

  // 一个迭代器要么指向 key/value 对, 要么指向非法位置.
  // 当且仅当第一种情况才为 valid.
  virtual bool Valid() const = 0;

  // 将迭代器移动到数据源的第一个 key/value 对.
  // 当前仅当数据源不空时, 调用完该方法再调用 Valid() 为 true.
  virtual void SeekToFirst() = 0;

  // 将迭代器移动到数据源的最后一个 key/value 对.
  // 当前仅当数据源不空时, 调用完该方法再调用 Valid() 为 true.
  virtual void SeekToLast() = 0;

  // 将迭代器指向移动到数据源 target 位置或之后的第一个 key.
  // 当且仅当移动后的位置存在数据项时, 调用 Valid() 才为 true.
  virtual void Seek(const Slice& target) = 0;

 
  // 将迭代器移动到数据源下一个数据项.
  // 当且仅当迭代器未指向数据源最后一个数据项时, 调用完该方法后调用 Valid() 结果为 true.
  // 注意: 调用该方法前提是迭代器当前指向必须 valid.
  virtual void Next() = 0;

  // 将迭代器移动到数据源前一个数据项.
  // 当且仅当迭代器未指向数据源第一个数据项时, 调用完该方法后调用 Valid() 结果为 true.
  // 注意: 调用该方法前提是迭代器当前指向必须 valid.  
  virtual void Prev() = 0;

  // 返回当前迭代器指向的数据项的 key, Slice 类型, 如果使用迭代器进行修改则会反映到
  // 已返回的 key 上面.
  // 注意: 调用该方法前提是迭代器当前执行必须 valid.
  virtual Slice key() const = 0;

  // 返回当前迭代器指向的数据项的 value, Slice 类型, 如果使用迭代器进行修改则会反映到
  // 已返回的 value 上面.
  // 注意: 调用该方法前提是迭代器当前执行必须 valid.
  virtual Slice value() const = 0;

  // 发生错误返回之; 否则返回 ok.
  virtual Status status() const = 0;

  // 我们允许调用方注册一个带两个参数的回调函数, 当迭代器析构时该函数会被自动调用.
  using CleanupFunction = void (*)(void* arg1, void* arg2);
  // 我们允许客户端注册 CleanupFunction 类型的回调函数, 在迭代器被销毁的时候会调用它们(可以注册多个). 
  // 注意, 跟前面的方法不同, RegisterCleanup 不是抽象的, 客户端不应该覆写他们. 
  void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);

 private:
  // 清理函数被维护在一个单向链表上, 其中头节点被 inlined 到迭代器中.
  // 该类用于保存用户注册的清理函数, 一个清理函数对应一个该类对象, 全部对象被维护在一个单向链表上. 
  struct CleanupNode {
    // 清理函数及其两个参数
    CleanupFunction function;
    void* arg1;
    void* arg2;
    // 下个清理函数
    CleanupNode* next;

    // 判断清理函数是否为空指针.
    bool IsEmpty() const { return function == nullptr; }
    // 运行调用方通过 Iterator::RegisterCleanup 注册的清理函数
    void Run() { assert(function != nullptr); (*function)(arg1, arg2); }
  };
  // 清理函数列表的头节点
  CleanupNode cleanup_head_;
}

Iterator 本身实现了三个方法, 分别是构造方法, 析构方法, 以及清理函数注册方法. 下面是非抽象方法的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 构造方法, 初始化唯一数据成员
Iterator::Iterator() {
  cleanup_head_.function = nullptr;
  cleanup_head_.next = nullptr;
}

Iterator::~Iterator() {
  // 析构时调用已注册的清理函数
  if (!cleanup_head_.IsEmpty()) {
    // 线性的, 如果在该迭代器上注册的清理函数太多了应该会影响性能, 但总要做释放操作, 时间总归省不了.
    cleanup_head_.Run();
    for (CleanupNode* node = cleanup_head_.next; node != nullptr; ) {
      node->Run();
      CleanupNode* next_node = node->next;
      delete node;
      node = next_node;
    }
  }
}

// 将用户定制的清理函数挂到单向链表上, 待迭代器销毁时挨个调用(见 ~Iterator()). 
void Iterator::RegisterCleanup(CleanupFunction func, void* arg1, void* arg2) {
  assert(func != nullptr);
  CleanupNode* node;
  if (cleanup_head_.IsEmpty()) {
    node = &cleanup_head_;
  } else {
    node = new CleanupNode();
    // 新节点插到 head 后面
    node->next = cleanup_head_.next;
    cleanup_head_.next = node;
  }
  node->function = func;
  node->arg1 = arg1;
  node->arg2 = arg2;
}

下面以 sstable 的 block 为例示意一下迭代器的实现. 关键部分就是将 block 作为迭代器数据源, 基于 block 构造和查询原理实现迭代器的前后移动, 定位等操作. 里面涉及了一些成员看不懂也不用管, 在介绍 sstable 时会解释.

具体类为 leveldb::Block::Iter, 代码位于 table/block.cc 文件:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class Block::Iter : public Iterator {
 private:
  // 迭代时使用的比较器
  const Comparator* const comparator_;
  // 指向 block 的指针
  const char* const data_;
  // block 的 restart 数组
  // (每个元素 32 位固定长度, 保存着每个 restart 在 block 里的偏移量)
  // 在 block 里的起始偏移量
  uint32_t const restarts_;
  // restart 数组元素个数(每个元素都是 uint32_t 类型)
  uint32_t const num_restarts_;

  // current_ 表示当前数据项在 data_ 里的偏移量, 
  // 如果迭代器无效则该值大于等于 restarts_ 
  // 即 restart array 在 block 的起始偏移量
  // (restart array 位于 block 后部, 数据项在 block 前半部分)
  uint32_t current_;
  // restart block 的索引值, current_ 指向的数据项落在该 block
  uint32_t restart_index_;
  // current_ 所指数据项的 key
  std::string key_; 
  // current_ 所指数据项的 value
  Slice value_; 
  // 当前迭代器对应的状态
  Status status_;

  inline int Compare(const Slice& a, const Slice& b) const {
    return comparator_->Compare(a, b);
  }

  // 返回 current_ 所指数据项的下一个数据项的偏移量. 
  // 根据 Block 布局我们可以知道, value 位于每个数据项最后, 
  // 所以 value 之后第一个字节即为下一个数据项起始位置. 
  inline uint32_t NextEntryOffset() const {
    return (value_.data() + value_.size()) - data_;
  }

  // 返回索引值为 index 的 restart 在 block 中的起始偏移量
  uint32_t GetRestartPoint(uint32_t index) {
    assert(index < num_restarts_);
    return DecodeFixed32(data_ + restarts_ + index * sizeof(uint32_t));
  }

  // 将迭代器移动到索引值为 index 的 restart 对应的偏移量位置.
  // 注意, 此方法只调整了 current_ 对应的 value_, 此时两者不再保持一致; 
  // current_ 与 key_ 仍然保持一致性. 
  void SeekToRestartPoint(uint32_t index) {
    key_.clear();
    restart_index_ = index;
    // current_ 和 key_ 指向后续会被 ParseNextKey() 校正.
    // ParseNextKey() 从 value_ 末尾开始, 所以这里需要设置好, 为何从 value_ 
    // 末尾开始呢? 根据 Block 布局我们可以知道, value 位于每个数据项最后, 
    // 所以 value 之后第一个字节即为下一个数据项起始位置.
    uint32_t offset = GetRestartPoint(index);
    // 将 value 数据起始地址设置为 offset 对应的 restart 起始位置, 
    // value_ 这么设置是为了方便 ParseNextKey().
    value_ = Slice(data_ + offset, 0);
  }

 public:
  Iter(const Comparator* comparator,
       const char* data,
       uint32_t restarts,
       uint32_t num_restarts)
      : comparator_(comparator),
        data_(data),
        restarts_(restarts),
        num_restarts_(num_restarts),
        current_(restarts_),
        restart_index_(num_restarts_) {
    assert(num_restarts_ > 0);
  }

  virtual bool Valid() const { return current_ < restarts_; }
  virtual Status status() const { return status_; }
  virtual Slice key() const {
    assert(Valid());
    return key_;
  }
  virtual Slice value() const {
    assert(Valid());
    return value_;
  }

  virtual void Next() {
    // 向后移动前提是当前指向合法
    assert(Valid());
    ParseNextKey();
  }

  // 将 current 指向当前数据项前一个数据项. 
  // 如果 current 指向的已经是 block 第 0 个数据项, 则无须移动了; 
  virtual void Prev() {
    // 向前移动前提是当前指向合法
    assert(Valid());

    // 倒着扫描, 直到 current_ 之前的一个 restart point.
    // current_ 大于等于所处 restart 段起始地址, 下面要做的
    // 是寻找 current_ 之前的一个 restart point. 
    // 把 current_ 当前取值作为原点.
    const uint32_t original = current_;
    // 下面循环干一件事, 定位 current_ 前一个数据项, 具体分两种情况:
    // - 如果 current_ 大于所处 restart 段起始地址, 不进行循环, 
    //   到下面去直接定位 current_ 前一个数据项即可.
    // - 如果 current_ 等于所处 restart 段起始地址, 
    //    - 如果当前 restart 不是 block 的首个 restart, 
    //      则 current_ 前一个数据项肯定位于前一个 restart 最后一个位置
    //    - 如果当前 restart 是 block 的首个 restart, 
    //      则 current_ 就是 block 首个数据项, 所以没有所谓前一个数据项了
    // - 没有其它情况. 
    // 循环能够执行的唯一条件就是相等
    while (GetRestartPoint(restart_index_) >= original) { 
      // 倒到开头的 restart point 了, 没法再向前倒了, 也就是没有 pre 了.
      if (restart_index_ == 0) { 
        // current_ 置为同 restarts_, 
        // 即使得它位于 block 首个 restart 的首个数据项处.
        current_ = restarts_;
        // 将 restart_index_ 置为 restart point 个数,
        // 这个索引是越界的.
        restart_index_ = num_restarts_;
        return;
      }
      // 倒车, 请注意.
      restart_index_--;
    }

    // 粗粒度移动, 即先将 current_ 移动到指定 restart 分段
    SeekToRestartPoint(restart_index_);
    do {
      // 细粒度移动, 将 current_ 移动到 original (current_ 移动之前的值)的前一个数据项
    } while (ParseNextKey() && NextEntryOffset() < original);
  }

  // 寻找 block 中第一个 key 大于等于 target 的数据项. 
  // 先通过二分法在 restart 段级定位查找目标段, 存在
  // key < target 且是最后一个 restart 段; 
  // 然后在目标段进行线性查找找到第一个 key 大约等于 target 的数据项.
  // 如果存在则 current_ 指向该目标数据项; 否则 current_ 指向
  // 一个非法数据项. 
  // 调用者需要检查返回结果以确认是否找到了. 
  virtual void Seek(const Slice& target) {
    // 在 restart array 中进行二分查找, 找到最后一个
    // 存在 key 小于 target 的 restart, 注意是小于, 
    // 这决定了后面二分查找时比较逻辑. 
    uint32_t left = 0;
    uint32_t right = num_restarts_ - 1;
    while (left < right) {
      uint32_t mid = (left + right + 1) / 2;
      uint32_t region_offset = GetRestartPoint(mid);
      uint32_t shared, non_shared, value_length;
      const char* key_ptr = DecodeEntry(data_ + region_offset,
                                        data_ + restarts_,
                                        &shared, &non_shared, &value_length);
      // 注意, 在 block 中, 每个位于 restart 起始处的数据项的 key 肯定
      // 是没有做过前缀压缩的, 所以 shared 肯定为 0.
      // 同时要注意, 这是异常情况, 不属于循环不变式的成立的条件. 
      if (key_ptr == nullptr || (shared != 0)) {
        CorruptionError();
        return;
      }
      Slice mid_key(key_ptr, non_shared);
      // 因为 block 中 key 都是递增排列的, 所以每个 restart 
      // 段位于 restart 首位置的那个 key 肯定是所在段最小的. 
      // 如果最后存在这样的 restart, 肯定是由 left 指向:
      // - 因为 left 右移的条件是远小于 target
      // - 因为 right 左移的条件是大于等于 target
      if (Compare(mid_key, target) < 0) {
        // Key at "mid" is smaller than "target".  Therefore all
        // blocks before "mid" are uninteresting.
        left = mid;
      } else {
        // Key at "mid" is >= "target".  Therefore all blocks at or
        // after "mid" are uninteresting.
        right = mid - 1;
      }
    }

    // 即使初始 left == right 也会走到这

    // 定位到目标 restart 段, 为下面线性查找打基础
    SeekToRestartPoint(left);
    // 定位到 left 指向的 restart 段以后, 挨个 key 进行比较,
    // 寻找第一个大于等于 target 的 key
    while (true) {
      if (!ParseNextKey()) {
        return;
      }
      if (Compare(key_, target) >= 0) {
        return;
      }
    }
  }

  // 将迭代器移动到 block 第一个数据项
  virtual void SeekToFirst() {
    // 定位到第一个 restart 段
    SeekToRestartPoint(0);
    // 在之前基础上定位到第一个数据项
    ParseNextKey();
  }

  // 将迭代器移动到 block 最后一个数据项
  virtual void SeekToLast() {
    // 定位到最后一个 restart 段
    SeekToRestartPoint(num_restarts_ - 1);
    // 在之前基础上定位到最后一个数据项
    while (ParseNextKey() && NextEntryOffset() < restarts_) {
      // Keep skipping
    }
  }

 private:
  // 如果出错, 则将各个成员置为非法值
  void CorruptionError() {
    current_ = restarts_;
    restart_index_ = num_restarts_;
    status_ = Status::Corruption("bad entry in block");
    key_.clear();
    value_.clear();
  }

  // 将 current_, key_, value_ 指向下一个数据项的
  // 起始偏移量、key 部分、value 部分, 同时保持
  // restart_index_ 与 current_ 新位置所处 restart 段一致. 
  bool ParseNextKey() {
    // current_ 指向接下来的数据项
    current_ = NextEntryOffset(); 
    const char* p = data_ + current_;
    // 数据部分之后紧接着就是 restart
    const char* limit = data_ + restarts_; 
    if (p >= limit) {
      // 数据部分到头了, 返回 false
      current_ = restarts_;
      restart_index_ = num_restarts_;
      return false;
    }

    // 解码下个数据项
    uint32_t shared, non_shared, value_length;
    p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
    // key_ 保存的还是前一个数据项的 key, 
    // 而大小必然不小于接下来的数据项的 key 与其公共前缀部分大小
    if (p == nullptr || key_.size() < shared) {
      CorruptionError();
      return false;
    } else {
      // 保留 key_ 与 key 公共前缀部分
      key_.resize(shared); 
      // 将当前数据项 key 的非公共部分追加进来, 得到一个完整的 key
      key_.append(p, non_shared); 
      // 当前数据项的 value 部分. 到此为止, 
      // current_、key_、value_ 都已指向同一个数据项. 
      value_ = Slice(p + non_shared, value_length); 
      
      // 因为 current_ 已经指向新的数据项, 所以它所处的 restart 段可能也递增了, 
      // 那就破坏了与 restart_index_ 的一致性, 所以需要调整. 
      // 下面循环干的事情, 就是要保持 restart_index_ 与 current_ 一致, 
      // 即让 restart_index_ 指向 current_ 所处 restart 在数组中的索引值. 
      // restart_index_ 如果指向最后一个 restart, 那么 current_ 
      // 此时肯定也在最后一个 restart 段. 
      while (restart_index_ + 1 < num_restarts_ &&
             GetRestartPoint(restart_index_ + 1) < current_) { 
        ++restart_index_;
      }
      return true;
    }
  }
};

// 根据用户定制的 comparator 构造该 block 的一个迭代器
Iterator* Block::NewIterator(const Comparator* cmp) {
  // block 尾部 4 字节为 restart 个数, 最少 4 字节
  if (size_ < sizeof(uint32_t)) { 
    return NewErrorIterator(Status::Corruption("bad block contents"));
  }
  const uint32_t num_restarts = NumRestarts();
  if (num_restarts == 0) {
    return NewEmptyIterator();
  } else {
    // 将 block 作为数据源, 构造迭代器
    return new Iter(cmp, data_, restart_offset_, num_restarts);
  }
}

从上面代码可以看出, 跟底层数据源相关的成员和方法都是 private 的, 迭代器接口方法实现都在 public 部分. 实现迭代器时依赖依赖两个重要的 private 方法, 一个是 SeekToRestartPoint() 一个是 ParseNextKey() 分别用于在迭代时实现先粗粒度后细粒度的目标定位.

双层迭代器, 对应的类为 class leveldb::<unnamed>::TwoLevelIterator, 位于 table/two_level_iterator.cc 文件. 它的父类为 leveldb::Iterator, 所以表现出来的性质是一样的.

该类设计比较巧妙, 这主要是由 sstable 文件结构决定的. 具体地, 要想在 sstable 文件中找到某个 key/value 对, 肯定先要找到它所属的 data Block, 而要找到 data Block 就要先在 index block 找到其对应的 BlockHandle. 双层迭代器就是这个寻找过程的实现.

该类包含两个迭代器封装:

  • 一个是 index_iter_, 它指向 index block 数据项. 针对每个 data block 都有一个对应的 entry 包含在 index block 中, entry 包含一个 key/value 对, 其中:
    • key 为大于等于对应 data block 最后(也是最大的, 因为排序过了)一个 key 同时小于接下来的 data block 的第一个 key 的(比较拗口)字符串;
    • value 是指向一个对应 data block 的 BlockHandle.
  • 另一个是 data_iter_, 它指向 data block 包含的数据项. 至于这个 data block 是否与 index_iter_ 所指数据项对应 data block 一致, 那要看实际情况, 不过即使不一致也无碍.

示意图如下:

/images/leveldb-annotations-4-iterator/1-two_level_iterator.png

这两个迭代器, 可以把 index_iter 看作钟表的时针, 指向具体小时, 可以把 data_iter_ 看作更精细的分针, 指向当前小时的具体分钟. 两个指针一起配合精确定位到我们要查询的数据. 这么说其实就能大体上猜出来, 迭代器前后移动, 定位等等这些方法是如何实现的了, 简单说就是先移动 index_iter_ 再移动 data_iter_. 以 Seek() 方法举例来说:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 根据 target 将 index_iter 和 data_iter 移动到对应位置
void TwoLevelIterator::Seek(const Slice& target) {
  // 因为 index block 每个数据项的 key 是对应 data block 中最大的那个 key, 
  // 所以 index block 数据项也是有序的, 不过比较"宏观" . 
  
  // 先找到目标 data block
  index_iter_.Seek(target);
  // 根据 index_iter_ 设置 data_iter_
  InitDataBlock();
  // 然后在目标 data block 找到目标数据项
  if (data_iter_.iter() != nullptr) data_iter_.Seek(target); 
  // data_iter_.iter() 为空则直接向前移动找到第一个不为空的
  // data block 的第一个数据项.
  SkipEmptyDataBlocksForward(); 
}

可以看出, 双层迭代器设计具有分形的思想, 迭代器是由迭代器构成的.

其它方法实现原理类似, 不再赘述.

–End–