面试时被问到 skiplist 怎么做并发控制,答的不是很好,所以来学习一下。
SkipList
代码文件在 memtable/skiplist.h ,这个版本的 skiplist 支持多读单写的无锁并发,如果需要多写的并发安全,则需要外部进行互斥,目前这个版本的跳表似乎已经被弃置。
结构
重要的结构是 class SkipList 、struct SkipList::Node 和 class SkipList::Iterator
class SkipList
class SkipList 成员变量如下:
const uint16_t kMaxHeight_;
const uint16_t kBranching_;
const uint32_t kScaledInverseBranching_;
// Immutable after construction
Comparator const compare_;
Allocator* const allocator_; // Allocator used for allocations of nodes
Node* const head_; // Modified only by Insert().
// Read racily by readers, but stale values are ok.
RelaxedAtomic<int> max_height_; // Height of the entire list
// Used for optimizing sequential insert patterns. Tricky.
// prev_[i] for i up to max_height_ is the predecessor of prev_[0] and
// prev_height_ is the height of prev_[0].
// prev_[0] can only be equal to head_ before insertion, in which case
// max_height_ and prev_height_ are 1.
// prev_ 用于优化顺序插入模式。
// 对于 up to max_height_ 的 i,prev_[i] 是 prev_[0] 的前驱。
// prev_height_ 是 prev_[0] 的高度。
// prev_[0] 只能在插入之前等于 head_,在这种情况下 max_height_ 和
// prev_height_ 都是 1。
int32_t prev_height_;
Node** prev_;
非常好理解的成员变量,可能比较疑惑的是 prev_height_ 和 prev_ ,这两个是用来记录上一次插入的位置,用以优化顺序插入。
struct SkipList::Node
struct SkipList::Node 成员变量如下:
public:
Key const key;
private:
// Array of length equal to the node height.
// next_[0] is lowest level link.
AcqRelAtomic<Node*> next_[1];
key 就是用来存储的键,显然在这里 Key 应该是一个指向数据内存的指针。
接下来的 next_[1] 是一个常见的 struct hack,在支持柔性数组的 C99 中可以直接写成 next_[],但 C++ 并不支持柔性数组,所以写成这样。
当然这么进行内存布局的话,就要求这个结构体不能以数组在内存紧密相挨,因为它实际的内存大小大于编译器以为的。
看一下 NewNode 方法就知道其用意了。
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::NewNode(const Key& key, int height) {
char* mem = allocator_->AllocateAligned(
sizeof(Node) + sizeof(AcqRelAtomic<Node*>) * (height - 1));
return new (mem) Node(key);
}
不过,从这个函数中可以留意到,AcqRelAtomic<Node *> 必须是一个简单的类型,能够支持这种操作。
那么它可以嘛?肯定是可以的。
AcqRelAtomic<Node *> 继承于 RelaxedAtomic<Node *>,其类型本身没有定义任何成员变量。
RelaxedAtomic<Node *> 内部只有一个 std::atomic<Node *> 的成员变量。
std::atomic<Node *> 类型中只有一个定义如下的成员变量:
// Align 1/2/4/8/16-byte types to at least their size. static constexpr int _S_min_alignment = (sizeof(_Tp) & (sizeof(_Tp) - 1)) || sizeof(_Tp) > 16 ? 0 : sizeof(_Tp); static constexpr int _S_alignment = _S_min_alignment > alignof(_Tp) ? _S_min_alignment : alignof(_Tp); alignas(_S_alignment) _Tp _M_i _GLIBCXX20_INIT(_Tp());
对应到这里,也就是内部只有一个指针变量,同时是内存对齐的。可以供随意摆弄。
class SkipList::Iterator
class SkipList::Iterator 成员变量如下:
const SkipList* list_;
Node* node_;
很简单。
方法
因为跳表实现本身不难,比较有趣的是其并发实现,和部分方法的实现。
Next, Prev
template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Next() {
assert(Valid());
node_ = node_->Next(0);
}
template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Prev() {
// Instead of using explicit "prev" links, we just search for the
// last node that falls before key.
assert(Valid());
node_ = list_->FindLessThan(node_->key);
if (node_ == list_->head_) {
node_ = nullptr;
}
}
对比迭代器这两个方法,可以发现,迭代器如果反向迭代,那么只会重新搜索,而不会像正向迭代那样直接修改指针指向下一个节点。
这是因为实现中 SkipList::Node 没有存储反向的指针,只存储了正向的指针。
我想这可能跟并发的实现相关,因为如果存储两个指针的话,在插入一个节点的时候,每层需要修改四个指针,新节点尚好,但在新节点进行发布 (release) 的时候,是没办法同时修改前后两个节点的两个指针的,这可能会让读者读到中间状态而困惑。
比如: prev <-> node <-> next
node 把自己的指针的 prev 和 next 已经做好了修改,然后修改 prev 和 next 其中之一,假设先修改 prev 再修改 next;那么在修改 prev 后,修改 next 前,读者可能会读到 prev->next != next->prev;
FindLessThan
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key, Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
// KeyIsAfter(key, last_not_after) is definitely false
Node* last_not_after = nullptr;
while (true) {
assert(x != nullptr);
Node* next = x->Next(level);
// 断言1:表示排序
assert(x == head_ || next == nullptr || KeyIsAfterNode(next->key, x));
// 断言2:没有超过
assert(x == head_ || KeyIsAfterNode(key, x));
if (next != last_not_after && KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
} else {
if (prev != nullptr) {
prev[level] = x;
}
if (level == 0) {
return x;
} else {
// Switch to next list, reuse KeyIUsAfterNode() result
last_not_after = next;
level--;
}
}
}
}
FindLessThan 用于查找最后一个 key < key。同时会填充 prev 参数表示各层级的 prev 节点。
在这里每一次循环代表层内步进一次(if (next != last_not_after && KeyIsAfterNode(*key*, next))),或者层间步进一次(else)
写的让我比较疑惑,如果让我写,我肯定会写成一个双重循环,外层代表层级循环,内存代表层内循环。
就像下面这样:
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key, Node** prev) const {
Node* x = head_;
Node* last_not_after = nullptr;
for (int level = GetMaxHeight() - 1; level >= 0; --level) {
Node* next = x->Next(level);
while (next != last_not_after && KeyIsAfterNode(key, next)) {
x = next;
next = x->Next(level);
}
if (prev != nullptr) {
prev[level] = x;
}
last_not_after = next;
}
return x;
}
没有选择使用双重循环可能有效率上的考量,可能可以减少一定的分支判断,也可能只是编写者个人习惯。
Insert
template <class Comparator>
bool InlineSkipList<Comparator>::Insert(const char* key) {
return Insert<false>(key, seq_splice_, false);
}
template <class Comparator>
template <bool UseCAS>
bool InlineSkipList<Comparator>::Insert(const char* key, Splice* splice,
bool allow_partial_splice_fix) {
Node* x = reinterpret_cast<Node*>(const_cast<char*>(key)) - 1;
const DecodedKey key_decoded = compare_.decode_key(key);
int height = x->UnstashHeight();
assert(height >= 1 && height <= kMaxHeight_);
int max_height = max_height_.LoadRelaxed();
while (height > max_height) {
// compare-and-swap(expected=max_height, desired=height)。
// 失败时会把传入的 max_height 更新为当前的原子值,因此下一轮用的是最新的
// max_height。
if (max_height_.CasWeakRelaxed(max_height, height)) {
// successfully updated it
max_height = height;
break;
}
// else retry, possibly exiting the loop because somebody else
// increased it
}
assert(max_height <= kMaxPossibleHeight);
int recompute_height = 0;
if (splice->height_ < max_height) {
// Either splice has never been used or max_height has grown since
// last use. We could potentially fix it in the latter case, but
// that is tricky.
//
// 要么这个 splice 从未被使用过,要么自上次使用以来 max_height 已经增大。
// 后者理论上可以修复,但实现起来比较棘手。
splice->prev_[max_height] = head_;
splice->next_[max_height] = nullptr;
splice->height_ = max_height;
recompute_height = max_height;
} else {
// Splice is a valid proper-height splice that brackets some
// key, but does it bracket this one? We need to validate it and
// recompute a portion of the splice (levels 0..recompute_height-1)
// that is a superset of all levels that don't bracket the new key.
// Several choices are reasonable, because we have to balance the work
// saved against the extra comparisons required to validate the Splice.
//
// One strategy is just to recompute all of orig_splice_height if the
// bottom level isn't bracketing. This pessimistically assumes that
// we will either get a perfect Splice hit (increasing sequential
// inserts) or have no locality.
//
// Another strategy is to walk up the Splice's levels until we find
// a level that brackets the key. This strategy lets the Splice
// hint help for other cases: it turns insertion from O(log N) into
// O(log D), where D is the number of nodes in between the key that
// produced the Splice and the current insert (insertion is aided
// whether the new key is before or after the splice). If you have
// a way of using a prefix of the key to map directly to the closest
// Splice out of O(sqrt(N)) Splices and we make it so that splices
// can also be used as hints during read, then we end up with Oshman's
// and Shavit's SkipTrie, which has O(log log N) lookup and insertion
// (compare to O(log N) for skip list).
//
// We control the pessimistic strategy with allow_partial_splice_fix.
// A good strategy is probably to be pessimistic for seq_splice_,
// optimistic if the caller actually went to the work of providing
// a Splice.
//
// Splice 用来括住一个 key 时,只可能变得宽松,不可能变得紧迫。因为 skiplist
// 不支持删除操作。 Splice 是一个有效且高度合适的 splice,它能括住某个 key,
// 但它能括住当前这个 key 吗?我们需要对其进行校验,
// 并重新计算 splice 的一部分(level 0 到 recompute_height-1),
// 这部分应覆盖所有不能括住新 key 的层级。
// 有几种合理的选择,因为我们需要在节省的工作量与
// 为校验 Splice 额外进行的比较之间取得平衡。
//
// 一种策略是,如果最底层不能括住,就重新计算所有 orig_splice_height。
// 这是一种悲观的假设,认为要么我们会得到完美的 Splice 命中(递增的
// 顺序插入),要么就完全没有局部性。
//
// 另一种策略是沿着 Splice 的层级向上走,直到找到一个能括住该 key 的层级。
// 这种策略让 Splice 提示在其他情况下也有帮助:它将插入的复杂度
// 从 O(log N) 降低到 O(log D),其中 D 是产生 Splice 的 key 与当前插入位置
// 之间的节点数(无论是新 key 在 splice 之前还是之后,插入都会得到帮助)。
// 如果你有一种方法,可以通过 key 的前缀直接映射到 O(sqrt(N)) 个 Splices
// 中最接近的一个,并且让 splices 也能在读操作时作为提示使用,
// 那么我们就得到了 Oshman 和 Shavit 的 SkipTrie,它具有 O(log log N) 的
// 查找和插入复杂度(相比之下,跳表是 O(log N))。
//
// 我们通过 allow_partial_splice_fix 来控制这种悲观策略。
// 一个好的策略可能是:对于 seq_splice_ 使用悲观策略,
// 如果调用者确实费了力气提供了一个 Splice,则使用乐观策略。
while (recompute_height < max_height) {
if (splice->prev_[recompute_height]->Next(recompute_height) !=
splice->next_[recompute_height]) {
++recompute_height;
} else if (splice->prev_[recompute_height] != head_ &&
!KeyIsAfterNode(key_decoded,
splice->prev_[recompute_height])) {
if (allow_partial_splice_fix) {
Node* bad = splice->prev_[recompute_height];
while (splice->prev_[recompute_height] == bad) {
++recompute_height;
}
} else {
recompute_height = max_height;
}
} else if (KeyIsAfterNode(key_decoded, splice->next_[recompute_height])) {
if (allow_partial_splice_fix) {
Node* bad = splice->next_[recompute_height];
while (splice->next_[recompute_height] == bad) {
++recompute_height;
}
} else {
recompute_height = max_height;
}
} else {
break;
}
}
}
assert(recompute_height <= max_height);
if (recompute_height > 0) {
RecomputeSpliceLevels(key_decoded, splice, recompute_height);
}
bool splice_is_valid = true;
if (UseCAS) {
for (int i = 0; i < height; ++i) {
while (true) {
// Checking for duplicate keys on the level 0 is sufficient
if (UNLIKELY(i == 0 && splice->next_[i] != nullptr &&
compare_(splice->next_[i]->Key(), key_decoded) <= 0)) {
return false;
}
if (UNLIKELY(i == 0 && splice->prev_[i] != head_ &&
compare_(splice->prev_[i]->Key(), key_decoded) >= 0)) {
return false;
}
assert(splice->next_[i] == nullptr ||
compare_(x->Key(), splice->next_[i]->Key()) < 0);
assert(splice->prev_[i] == head_ ||
compare_(splice->prev_[i]->Key(), x->Key()) < 0);
x->NoBarrier_SetNext(i, splice->next_[i]);
if (splice->prev_[i]->CASNext(i, splice->next_[i], x)) {
break;
}
// CAS 失败了,我们需要重新计算 prev 和 next。
// 重新搜索时尝试使用不同的层级大概率没有帮助,因为在 prev[i] 和 next[i]
// 之间插入了大量节点的情况不太可能发生。也没有必要把 next[i]
// 当作“之后”的提示,因为我们知道它已经过期了。
FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i,
&splice->prev_[i], &splice->next_[i]);
if (i > 0) {
splice_is_valid = false;
}
}
}
} else {
for (int i = 0; i < height; ++i) {
if (i >= recompute_height &&
splice->prev_[i]->Next(i) != splice->next_[i]) {
FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i,
&splice->prev_[i], &splice->next_[i]);
}
// Checking for duplicate keys on the level 0 is sufficient
if (UNLIKELY(i == 0 && splice->next_[i] != nullptr &&
compare_(splice->next_[i]->Key(), key_decoded) <= 0)) {
return false;
}
if (UNLIKELY(i == 0 && splice->prev_[i] != head_ &&
compare_(splice->prev_[i]->Key(), key_decoded) >= 0)) {
return false;
}
assert(splice->next_[i] == nullptr ||
compare_(x->Key(), splice->next_[i]->Key()) < 0);
assert(splice->prev_[i] == head_ ||
compare_(splice->prev_[i]->Key(), x->Key()) < 0);
assert(splice->prev_[i]->Next(i) == splice->next_[i]);
x->NoBarrier_SetNext(i, splice->next_[i]);
splice->prev_[i]->SetNext(i, x);
}
}
if (splice_is_valid) {
for (int i = 0; i < height; ++i) {
splice->prev_[i] = x;
}
assert(splice->prev_[splice->height_] == head_);
assert(splice->next_[splice->height_] == nullptr);
for (int i = 0; i < splice->height_; ++i) {
assert(splice->next_[i] == nullptr ||
compare_(key, splice->next_[i]->Key()) < 0);
assert(splice->prev_[i] == head_ ||
compare_(splice->prev_[i]->Key(), key) <= 0);
assert(splice->prev_[i + 1] == splice->prev_[i] ||
splice->prev_[i + 1] == head_ ||
compare_(splice->prev_[i + 1]->Key(), splice->prev_[i]->Key()) <
0);
assert(splice->next_[i + 1] == splice->next_[i] ||
splice->next_[i + 1] == nullptr ||
compare_(splice->next_[i]->Key(), splice->next_[i + 1]->Key()) <
0);
}
} else {
splice->height_ = 0;
}
return true;
}
insert 这个函数大概分为两个部分,前半部分的 splice 校验及修正,后半部分的正式插入。
先看前半部分。
int max_height = max_height_.LoadRelaxed(); while (height > max_height) { // compare-and-swap(expected=max_height, desired=height)。 // 失败时会把传入的 max_height 更新为当前的原子值,因此下一轮用的是最新的 // max_height。 if (max_height_.CasWeakRelaxed(max_height, height)) { // successfully updated it max_height = height; break; } // else retry, possibly exiting the loop because somebody else // increased it } assert(max_height <= kMaxPossibleHeight);
如果需要,则先对 max_height_ 进行修改,这是第一步。
接下来是比较难理解的是 splice 矫正,这分为两个部分,一个是检查并得到从第几层开始修正,另一个则是正式的矫正。
int recompute_height = 0; if (splice->height_ < max_height) { // Either splice has never been used or max_height has grown since // last use. We could potentially fix it in the latter case, but // that is tricky. // // 要么这个 splice 从未被使用过,要么自上次使用以来 max_height 已经增大。 // 后者理论上可以修复,但实现起来比较棘手。 splice->prev_[max_height] = head_; splice->next_[max_height] = nullptr; splice->height_ = max_height; recompute_height = max_height; }
这里处理的是 splice 的 height 较低的情况,因为这个情况较难处理,所以直接放弃进行优化,直接在下面进行矫正时,从最顶层进行。
else { // Splice is a valid proper-height splice that brackets some // key, but does it bracket this one? We need to validate it and // recompute a portion of the splice (levels 0..recompute_height-1) // that is a superset of all levels that don't bracket the new key. // Several choices are reasonable, because we have to balance the work // saved against the extra comparisons required to validate the Splice. // // One strategy is just to recompute all of orig_splice_height if the // bottom level isn't bracketing. This pessimistically assumes that // we will either get a perfect Splice hit (increasing sequential // inserts) or have no locality. // // Another strategy is to walk up the Splice's levels until we find // a level that brackets the key. This strategy lets the Splice // hint help for other cases: it turns insertion from O(log N) into // O(log D), where D is the number of nodes in between the key that // produced the Splice and the current insert (insertion is aided // whether the new key is before or after the splice). If you have // a way of using a prefix of the key to map directly to the closest // Splice out of O(sqrt(N)) Splices and we make it so that splices // can also be used as hints during read, then we end up with Oshman's // and Shavit's SkipTrie, which has O(log log N) lookup and insertion // (compare to O(log N) for skip list). // // We control the pessimistic strategy with allow_partial_splice_fix. // A good strategy is probably to be pessimistic for seq_splice_, // optimistic if the caller actually went to the work of providing // a Splice. // // Splice 用来括住一个 key 时,只可能变得宽松,不可能变得紧迫。因为 skiplist // 不支持删除操作。 Splice 是一个有效且高度合适的 splice,它能括住某个 key, // 但它能括住当前这个 key 吗?我们需要对其进行校验, // 并重新计算 splice 的一部分(level 0 到 recompute_height-1), // 这部分应覆盖所有不能括住新 key 的层级。 // 有几种合理的选择,因为我们需要在节省的工作量与 // 为校验 Splice 额外进行的比较之间做权衡。 // // 一种策略是在底层(level 0)不能括住时,直接重新计算 // 原 splice 的全部高度。这种做法是悲观的,假设我们要么能 // 获得一次完美的 Splice 命中(增强顺序插入),要么没有局部性。 // // 另一种策略是沿着 Splice 的层级向上走,直到找到一个 // 能括住该 key 的层级。该策略使 Splice 的提示在其他情况下也有帮助: // 它将插入从 O(log N) 变为 O(log D),其中 D 是由产生该 Splice 的 key // 到当前插入之间的节点数量(无论新 key 在 splice 之前还是之后, // 插入都能得到帮助)。如果你能用 key 的前缀把它直接映射到 // O(sqrt(N)) 个 Splice 中最近的那个,并且我们让 splice 在读路径中 // 也能作为提示使用,那么我们就得到了 Oshman 和 Shavit 的 SkipTrie, // 其查找与插入为 O(log log N)(相比跳表为 O(log N))。 // // 我们用 allow_partial_splice_fix 来控制这种悲观策略。 // 一个可能较好的策略是:对 seq_splice_ 采取悲观策略; // 如果调用方确实提供了一个 Splice,则采取乐观策略。 while (recompute_height < max_height) { if (splice->prev_[recompute_height]->Next(recompute_height) != splice->next_[recompute_height]) { // splice isn't tight at this level, there must have been some inserts // to this // location that didn't update the splice. We might only be a little // stale, but if // the splice is very stale it would be O(N) to fix it. We haven't used // up any of // our budget of comparisons, so always move up even if we are // pessimistic about // our chances of success. // // 因为我们每次进行操作可以认为有一个成本上限,为了减少这个成本,不能在较低层直接进行紧缩操作。 // 在这一层 splice 并不紧密,说明在这个位置发生过一些插入但没有更新 // splice。 我们可能只是一点点过期,但如果 splice // 非常过期,修复它的代价将是 O(N)。 // 我们还没有用掉比较操作的预算,所以即使对成功几率持悲观态度,也要总是向上提升一层继续检查。 // 因为向上检查可以减少时间复杂度, ++recompute_height; } else if (splice->prev_[recompute_height] != head_ && !KeyIsAfterNode(key_decoded, splice->prev_[recompute_height])) { // key is from before splice if (allow_partial_splice_fix) { // skip all levels with the same node without more comparisons Node* bad = splice->prev_[recompute_height]; while (splice->prev_[recompute_height] == bad) { ++recompute_height; } } else { // we're pessimistic, recompute everything recompute_height = max_height; } } else if (KeyIsAfterNode(key_decoded, splice->next_[recompute_height])) { // key is from after splice if (allow_partial_splice_fix) { Node* bad = splice->next_[recompute_height]; while (splice->next_[recompute_height] == bad) { ++recompute_height; } } else { recompute_height = max_height; } } else { // this level brackets the key, we won! break; } } }
这里进行的操作是得到需要修正的所有层级,如果某一层完全套住了 key,那么就可以终止循环,因为 splice 遵从约束 prev[i+1].key <= prev[i].key < next[i].key <=next[i+1].key。
循环中有一组 if else。
第一个 if (splice->prev_[recompute_height]->Next(recompute_height) !=splice->next_[recompute_height]) 表示 splice 中间插入了其他值,可能是 splice 过期了,所以需要进行修复,直接 ++recompute_height。
第二个 else if (splice->prev_[recompute_height] != head_ &&!KeyIsAfterNode(key_decoded,splice->prev_[recompute_height])) 表示 key 在 splice 左边,需要修复。
第三个与第二个逻辑一致,表示 key 在 splice 右边,需要修复。
第四个则表示 key 完全落入 splice 中间,则可以推出循环,不需要继续修复。
assert(recompute_height <= max_height); if (recompute_height > 0) { RecomputeSpliceLevels(key_decoded, splice, recompute_height); }
通过调用 RecomputeSpliceLevels 即可对 splice 进行修复,其逻辑与上面的描述一致。
其实这里修复是要求 splice 的 recompute_height 层以及之下都能紧迫的套住 key。
而插入在这之后发生。
bool splice_is_valid = true; if (UseCAS) { for (int i = 0; i < height; ++i) { while (true) { // Checking for duplicate keys on the level 0 is sufficient if (UNLIKELY(i == 0 && splice->next_[i] != nullptr && compare_(splice->next_[i]->Key(), key_decoded) <= 0)) { // duplicate key return false; } if (UNLIKELY(i == 0 && splice->prev_[i] != head_ && compare_(splice->prev_[i]->Key(), key_decoded) >= 0)) { // duplicate key return false; } assert(splice->next_[i] == nullptr || compare_(x->Key(), splice->next_[i]->Key()) < 0); assert(splice->prev_[i] == head_ || compare_(splice->prev_[i]->Key(), x->Key()) < 0); x->NoBarrier_SetNext(i, splice->next_[i]); if (splice->prev_[i]->CASNext(i, splice->next_[i], x)) { // success break; } // CAS failed, we need to recompute prev and next. It is unlikely // to be helpful to try to use a different level as we redo the // search, because it should be unlikely that lots of nodes have // been inserted between prev[i] and next[i]. No point in using // next[i] as the after hint, because we know it is stale. // // CAS 失败了,我们需要重新计算 prev 和 next。 // 重新搜索时尝试使用不同的层级大概率没有帮助,因为在 prev[i] 和 next[i] // 之间插入了大量节点的情况不太可能发生。 也没有必要把 next[i] // 当作“之后”的提示,因为我们知道它已经过期了。 FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i, &splice->prev_[i], &splice->next_[i]); // Since we've narrowed the bracket for level i, we might have // violated the Splice constraint between i and i-1. Make sure // we recompute the whole thing next time. // 由于我们在第 i 层把括住区间收紧了,可能打破了 i 与 i-1 层之间的 // Splice 约束。 确保下次把整个东西重新计算一遍。 // Splice 约束是指:prev_[i+1] <= prev_[i] < next_[i] <= next_[i+1]. if (i > 0) { splice_is_valid = false; } } } }
先看一下支持并发插入的部分,这个部分假设 splice 是紧迫地套住 key 左右的,不过仍然会进行检查。
其实不难理解,指针修改从下到上可以尽快发现是否存在重复键,因为重复键需要返回失败,而已经修改的指针没办法安全的还原,所以必须从下到上修改。
于是第一步就是进行 key 的检查,是否存在重复键。
虽然先对插入的节点修改指针,最后使用 CAS 原子修改 prev 节点的指针指向。
如果 CAS 失败,说明可能存在其他线程并发的进行了插入,所以需要重新调用 FindSpliceForLevel 对 splice 进行重新紧迫,并进行下一次尝试。
splice_is_valid 用来指示是否 splice 是否在插入时进行了修改,因为如果插入时进行修改,可能会导致 splice 的约束遭到了破坏。
非并发插入部分的代码和并发插入部分的逻辑基本一致。
可以看到 splice 在插入过程中的作用,通过锁定范围,可以用来检查并发的冲突,非常的巧妙。
if (splice_is_valid) { for (int i = 0; i < height; ++i) { splice->prev_[i] = x; } assert(splice->prev_[splice->height_] == head_); assert(splice->next_[splice->height_] == nullptr); for (int i = 0; i < splice->height_; ++i) { assert(splice->next_[i] == nullptr || compare_(key, splice->next_[i]->Key()) < 0); assert(splice->prev_[i] == head_ || compare_(splice->prev_[i]->Key(), key) <= 0); assert(splice->prev_[i + 1] == splice->prev_[i] || splice->prev_[i + 1] == head_ || compare_(splice->prev_[i + 1]->Key(), splice->prev_[i]->Key()) < 0); assert(splice->next_[i + 1] == splice->next_[i] || splice->next_[i + 1] == nullptr || compare_(splice->next_[i]->Key(), splice->next_[i + 1]->Key()) < 0); } } else { splice->height_ = 0; }
最后是收尾工作,对 splice 进行检查和保存当此结果。
如果 splice_is_valid == true,先修改 splice 的 prev,修改为指向插入的节点,保存当次结果,最后是进行一些检查,如果符合升序和非空。
如果 splice_is_valid == false,修改 splice 的 height 为 0 相当于标记了 splice 为一个无用值,在插入前会对 splice 进行检查 if (splice->height_ < max_height) 也有这个原因。
memtable 的方法参数太多了,看得有点头大,等看完了再做个总结。