面试时被问到 skiplist 怎么做并发控制,答的不是很好,所以来学习一下。

SkipList

代码文件在 memtable/skiplist.h ,这个版本的 skiplist 支持多读单写的无锁并发,如果需要多写的并发安全,则需要外部进行互斥,目前这个版本的跳表似乎已经被弃置。

结构

重要的结构是 class SkipListstruct SkipList::Nodeclass SkipList::Iterator

class SkipList

class SkipList 成员变量如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const uint16_t kMaxHeight_;
const uint16_t kBranching_;
const uint32_t kScaledInverseBranching_;

// Immutable after construction
Comparator const compare_;
Allocator* const allocator_; // Allocator used for allocations of nodes

Node* const head_;

// Modified only by Insert(). Read racily by readers, but stale
// values are ok.
RelaxedAtomic<int> max_height_; // Height of the entire list

// Used for optimizing sequential insert patterns. Tricky. prev_[i] for
// i up to max_height_ is the predecessor of prev_[0] and prev_height_
// is the height of prev_[0]. prev_[0] can only be equal to head before
// insertion, in which case max_height_ and prev_height_ are 1.
// prev_ 用于优化顺序插入模式。
// 对于 up to max_height_ 的 i,prev_[i] 是 prev_[0] 的前驱。
// prev_height_ 是 prev_[0] 的高度。
// prev_[0] 只能在插入之前等于 head_,在这种情况下 max_height_ 和 prev_height_ 都是 1。
int32_t prev_height_;
Node** prev_;

非常好理解的成员变量,可能比较疑惑的是 prev_height_prev_ ,这两个是用来记录上一次插入的位置,用以优化顺序插入。

struct SkipList::Node

struct SkipList::Node 成员变量如下:

1
2
3
4
5
 public:
Key const key;
private:
// Array of length equal to the node height. next_[0] is lowest level link.
AcqRelAtomic<Node*> next_[1];

key 就是用来存储的键,显然在这里 Key 应该是一个指向数据内存的指针。

接下来的 next_[1] 是一个常见的 struct hack,在支持柔性数组的 C99 中可以直接写成 next_[],但 C++ 并不支持柔性数组,所以写成这样。

当然这么进行内存布局的话,就要求这个结构体不能以数组在内存紧密相挨,因为它实际的内存大小大于编译器以为的。

看一下 NewNode 方法就知道其用意了。

1
2
3
4
5
6
7
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode(
const Key& key, int height) {
char* mem = allocator_->AllocateAligned(
sizeof(Node) + sizeof(AcqRelAtomic<Node*>) * (height - 1));
return new (mem) Node(key);
}

不过,从这个函数中可以留意到,AcqRelAtomic<Node *> 必须是一个简单的类型,能够支持这种操作。

那么它可以嘛?肯定是可以的。

AcqRelAtomic<Node *> 继承于 RelaxedAtomic<Node *>,其类型本身没有定义任何成员变量。

RelaxedAtomic<Node *> 内部只有一个 std::atomic<Node *> 的成员变量。

std::atomic<Node *> 类型中只有一个定义如下的成员变量:

1
2
3
4
5
6
7
8
9
     // Align 1/2/4/8/16-byte types to at least their size.
static constexpr int _S_min_alignment
= (sizeof(_Tp) & (sizeof(_Tp) - 1)) || sizeof(_Tp) > 16
? 0 : sizeof(_Tp);

static constexpr int _S_alignment
= _S_min_alignment > alignof(_Tp) ? _S_min_alignment : alignof(_Tp);

alignas(_S_alignment) _Tp _M_i _GLIBCXX20_INIT(_Tp());

对应到这里,也就是内部只有一个指针变量,同时是内存对齐的。可以供随意摆弄。

class SkipList::Iterator

class SkipList::Iterator 成员变量如下:

1
2
const SkipList* list_;
Node* node_;

很简单。

方法

因为跳表实现本身不难,比较有趣的是其并发实现,和部分方法的实现。

Next, Prev

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Next() {
assert(Valid());
node_ = node_->Next(0);
}

template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Prev() {
// Instead of using explicit "prev" links, we just search for the
// last node that falls before key.
assert(Valid());
node_ = list_->FindLessThan(node_->key);
if (node_ == list_->head_) {
node_ = nullptr;
}
}

对比迭代器这两个方法,可以发现,迭代器如果反向迭代,那么只会重新搜索,而不会像正向迭代那样直接修改指针指向下一个节点。

这是因为实现中 SkipList::Node 没有存储反向的指针,只存储了正向的指针。

我想这可能跟并发的实现相关,因为如果存储两个指针的话,在插入一个节点的时候,每层需要修改四个指针,新节点尚好,但在新节点进行发布 (release) 的时候,是没办法同时修改前后两个节点的两个指针的,这可能会让读者读到中间状态而困惑。

比如: prev <-> node <-> next

node 把自己的指针的 prev 和 next 已经做好了修改,然后修改 prev 和 next 其中之一,假设先修改 prev 再修改 next;那么在修改 prev 后,修改 next 前,读者可能会读到 prev->next != next->prev;

FindLessThan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key, Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
// KeyIsAfter(key, last_not_after) is definitely false
Node* last_not_after = nullptr;
while (true) {
assert(x != nullptr);
Node* next = x->Next(level);
// 断言1:表示排序
assert(x == head_ || next == nullptr || KeyIsAfterNode(next->key, x));
// 断言2:没有超过
assert(x == head_ || KeyIsAfterNode(key, x));
if (next != last_not_after && KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
} else {
if (prev != nullptr) {
prev[level] = x;
}
if (level == 0) {
return x;
} else {
// Switch to next list, reuse KeyIUsAfterNode() result
last_not_after = next;
level--;
}
}
}
}

FindLessThan 用于查找最后一个 key < key。同时会填充 prev 参数表示各层级的 prev 节点。

在这里每一次循环代表层内步进一次(if (next != last_not_after && KeyIsAfterNode(*key*, next))),或者层间步进一次(else)

写的让我比较疑惑,如果让我写,我肯定会写成一个双重循环,外层代表层级循环,内存代表层内循环。

就像下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key, Node** prev) const {
Node* x = head_;
Node* last_not_after = nullptr;
for (int level = GetMaxHeight() - 1; level >= 0; --level) {
Node* next = x->Next(level);
while (next != last_not_after && KeyIsAfterNode(key, next)) {
x = next;
next = x->Next(level);
}

if (prev != nullptr) {
prev[level] = x;
}

last_not_after = next;
}
return x;
}

没有选择使用双重循环可能有效率上的考量,可能可以减少一定的分支判断,也可能只是编写者个人习惯。

Insert

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// fast path for sequential insertion
if (!KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) &&
(prev_[0] == head_ || KeyIsAfterNode(key, prev_[0]))) {
assert(prev_[0] != head_ || (prev_height_ == 1 && GetMaxHeight() == 1));

// Outside of this method prev_[1..max_height_] is the predecessor
// of prev_[0], and prev_height_ refers to prev_[0]. Inside Insert
// prev_[0..max_height - 1] is the predecessor of key. Switch from
// the external state to the internal
// 在此方法外部,prev_[1..max_height_] 是 prev_[0] 的前身
// prev_height_ 指的是 prev_[0]。在方法内部插入
// prev_[0..max_height - 1] 是 key 的前身。从
// 外部状态切换到内部状态
for (int i = 1; i < prev_height_; i++) {
prev_[i] = prev_[0];
}
} else {
// TODO(opt): we could use a NoBarrier predecessor search as an
// optimization for architectures where memory_order_acquire needs
// a synchronization instruction. Doesn't matter on x86

// TODO(opt): 我们可以使用 NoBarrier 前导搜索作为
// 针对 memory_order_acquire 需要
// 同步指令的架构的优化。在 x86 上无所谓
FindLessThan(key, prev_);
}

// Our data structure does not allow duplicate insertion
assert(prev_[0]->Next(0) == nullptr || !Equal(key, prev_[0]->Next(0)->key));

int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev_[i] = head_;
}
// fprintf(stderr, "Change height from %d to %d\n", max_height_, height);

// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
// 无需任何同步即可修改 max_height_,
// 并发读取器也是如此。观察 max_height_ 的新值的并发读取器将看到以下两种值:
// 来自 head_ 的新层级指针的旧值(nullptr),或在下面的循环中设置的新值。
// 在前一种情况下,读取器将
// 立即下降到下一层级,因为 nullptr 在所有键之后排序。
// 在后一种情况下,读取器将使用新节点。
max_height_.StoreRelaxed(height);
}

Node* x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev_[i]->NoBarrier_Next(i));
prev_[i]->SetNext(i, x);
}
prev_[0] = x;
prev_height_ = height;
}

这个版本的 SkipList 插入实现的很简单,因为插入只需要考虑单个写入,不需要考虑多个写入的情况,所以只要不妨碍读者即可,其中 max_height_ 的更新是原子的,即使读者不论读到更大值还是修改前的小值都不会导致错误。

后面插入的部分显得理所当然,没有任何需要特别说明的地方。

可能需要留意的是对 prev_ 的使用,其含义是上一次插入的节点,在函数最后 prev_[0] = x; 中有所体现。

作用是如果这次插入的节点正好在上次插入节点的后面,则不必使用 FindLessThan 来获取需要插入的位置,直接复用上次的结果即可。

InlineSkipList

代码在 memtable/inlineskiplist.h 这是目前 rocksdb 正在使用的跳表实现。支持多写多读的无锁跳表,但也提供了单写安全而多写不安全的接口。其名字中的 Inline 内联,体现在结构布局上的变化。

结构

重要结构是 class InlineSkipList, struct InlineSkipList::Node, struct InlineSkipList::Spliceclass InlineSkipList::Iterator

class InlineSkipList

class InlineSkipList 的成员变量如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const uint16_t kMaxHeight_;
const uint16_t kBranching_;
const uint32_t kScaledInverseBranching_;

Allocator* const allocator_; // Allocator used for allocations of nodes
// Immutable after construction
Comparator const compare_;
Node* const head_;

// Maximum height of any node in the list (or in the process of being added).
// Modified only by Insert(). Relaxed reads are always OK because starting
// from higher levels only helps efficiency, not correctness.
RelaxedAtomic<int> max_height_;

// seq_splice_ is a Splice used for insertions in the non-concurrent
// case. It caches the prev and next found during the most recent
// non-concurrent insertion.
//
// seq_splice_ 是一个 Splice 用于非并发插入。
// 它缓存了最近一次非并发插入的 prev 和 next。
Splice* seq_splice_;

可以看到和 SkipList 基本一致,除了 prev_ 换成了 seq_splice_,这也体现这两者之间的某种关系。

struct InlineSkipList::Node

struct InlineSkipList::Node 的成员变量如下:

1
2
3
4
private:
// next_[0] is the lowest level link (level 0). Higher levels are
// stored _earlier_, so level 1 is at next_[-1].
AcqRelAtomic<Node*> next_[1];

是的,只有一个 AcqRelAtomic<Node*> next_[1] 其他什么都没有,这就是名字 Inline 的来源。它把 key 和 next_ 合并在了一个节点上,而不是通过指针指向 key,这样减少了一个 key 指针的存储。

这个数组的 <= 0 的部分仍然作为 next_ 指针,但是 >= 1 的部分变成了 key 的部分。

类似于:[...&next_[2]], &next_[1]][&next_[0]][....key....]

next_[0] 作为了一个分界线。

另一方面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Stores the height of the node in the memory location normally used for
// next_[0]. This is used for passing data from AllocateKey to Insert.
void StashHeight(const int height) {
static_assert(sizeof(int) <= sizeof(next_[0]));
memcpy(static_cast<void*>(&next_[0]), &height, sizeof(int));
}

// Retrieves the value passed to StashHeight. Undefined after a call
// to SetNext or NoBarrier_SetNext.
int UnstashHeight() const {
int rv;
memcpy(&rv, &next_[0], sizeof(int));
return rv;
}

在这里 Node 的使用方式实现申请分配一个节点,然后把 key 域给上层进行写入,写入之后进行插入。其中高度在分配节点的时候就已经确定了,所以 next_[0] 会被暂时借用,用来存储节点高度,直到插入的时候,插入时拿到这个高度就可以覆盖掉了,另外 int 是 4 字节肯定是小于等于一个指针的字节数。因为节点本身不必存储高度信息,这个信息在进行搜索的时候会隐型获取。

另外一提,在 rocksdb wiki 中提到,SkipList 平均每个节点会存储 1.33 个指针。(来源: MemTable · facebook/rocksdb Wiki

计算方法其实就是一个等比数列,因为其默认的概率参数为 1/4。

而每个节点在第零层都一定有一个指针,所以就是:1 + 1/4 + 1/16 + …. 的累和,也就是 1.33.

struct InlineSkipList::Splice

struct InlineSkipList::Splice 的成员变量如下:

1
2
3
4
5
6
7
8
9
10
// The invariant of a Splice is that prev_[i+1].key <= prev_[i].key <
// next_[i].key <= next_[i+1].key for all i. That means that if a
// key is bracketed by prev_[i] and next_[i] then it is bracketed by
// all higher levels. It is _not_ required that prev_[i]->Next(i) ==
// next_[i] (it probably did at some point in the past, but intervening
// or concurrent operations might have inserted nodes in between).
//
int height_ = 0;
Node** prev_;
Node** next_;

注意这里有一个约束:prev[i+1].key <= prev[i].key < next[i].key <=next[i+1].key

其作用会在 Insert 中有所体现。

class InlineSkipList::Iterator

class InlineSkipList::Iterator 的成员变量如下:

1
2
3
private:
const InlineSkipList* list_;
Node* node_;

与前面的 SkipList 一致。

方法

Next, Prev

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template <class Comparator>
inline void InlineSkipList<Comparator>::Iterator::Next() {
assert(Valid());
node_ = node_->Next(0);
}

template <class Comparator>
inline void InlineSkipList<Comparator>::Iterator::Prev() {
// Instead of using explicit "prev" links, we just search for the
// last node that falls before key.
assert(Valid());
node_ = list_->FindLessThan(node_->Key(), nullptr);
if (node_ == list_->head_) {
node_ = nullptr;
}
}

这部分代码和 SkipList 完全一致,说明 InlineSkipList 的反向迭代的效率也肯定是不如正向迭代的。

FindLessThan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
template <class Comparator>
typename InlineSkipList<Comparator>::Node*
InlineSkipList<Comparator>::FindLessThan(const char* key,
Node** const out_of_order_node) const {
int level = GetMaxHeight() - 1;
assert(level >= 0);
Node* x = head_;
// KeyIsAfter(key, last_not_after) is definitely false
Node* last_not_after = nullptr;
const DecodedKey key_decoded = compare_.decode_key(key);
while (true) {
assert(x != nullptr);
Node* next = x->Next(level);
if (next != nullptr) {
PREFETCH(next->Next(level), 0, 1);
if (out_of_order_node && x != head_ &&
compare_(x->Key(), next->Key()) >= 0) {
*out_of_order_node = next;
return x;
}
}
assert(x == head_ || next == nullptr || KeyIsAfterNode(next->Key(), x));
assert(x == head_ || KeyIsAfterNode(key_decoded, x));
if (next != last_not_after && KeyIsAfterNode(key_decoded, next)) {
// Keep searching in this list
assert(next != nullptr);
x = next;
} else {
if (level == 0) {
return x;
} else {
// Switch to next list, reuse KeyIsAfterNode() result
last_not_after = next;
level--;
}
}
}
}

FindLessThan 这部分代码逻辑是和 SkipList 一致的,但是参数变了,同时不再填充 prev 参数,仅会填充失序的节点。同时增加了一个 PREFETCH 的功能,用于提高效率。

FindRandomEntry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
template <class Comparator>
typename InlineSkipList<Comparator>::Node*
InlineSkipList<Comparator>::FindRandomEntry() const {
// TODO(bjlemaire): consider adding PREFETCH calls.
Node *x = head_, *scan_node = nullptr, *limit_node = nullptr;

// We start at the max level.
// FOr each level, we look at all the nodes at the level, and
// we randomly pick one of them. Then decrement the level
// and reiterate the process.
// eg: assume GetMaxHeight()=5, and there are #100 elements (nodes).
// level 4 nodes: lvl_nodes={#1, #15, #67, #84}. Randomly pick #15.
// We will consider all the nodes between #15 (inclusive) and #67
// (exclusive). #67 is called 'limit_node' here.
// level 3 nodes: lvl_nodes={#15, #21, #45, #51}. Randomly choose
// #51. #67 remains 'limit_node'.
// [...]
// level 0 nodes: lvl_nodes={#56,#57,#58,#59}. Randomly pick $57.
// Return Node #57.
//
std::vector<Node*> lvl_nodes;
Random* rnd = Random::GetTLSInstance();
int level = GetMaxHeight() - 1;

while (level >= 0) {
lvl_nodes.clear();
scan_node = x;
while (scan_node != limit_node) {
lvl_nodes.push_back(scan_node);
scan_node = scan_node->Next(level);
}
uint32_t rnd_idx = rnd->Next() % lvl_nodes.size();
x = lvl_nodes[rnd_idx];
if (rnd_idx + 1 < lvl_nodes.size()) {
limit_node = lvl_nodes[rnd_idx + 1];
}
level--;
}
// There is a special case where x could still be the head_
// (note that the head_ contains no key).
return x == head_ && head_ != nullptr ? head_->Next(0) : x;
}

这个函数用来返回随机一个节点,基本思路是在每层中随机选取,并在下一层限定范围。

比如,在本层选定了 cur,那么下一层的范围就是 [cur[level - 1], cur->next[level - 1]]

在这个范围内进行随机选取,时间复杂度大致为 Log(n)。

FindSpliceForLevel, RecomputeSpliceLevels

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
template <class Comparator>
template <bool prefetch_before>
void InlineSkipList<Comparator>::FindSpliceForLevel(const DecodedKey& key,
Node* before, Node* after,
int level, Node** out_prev,
Node** out_next) {
while (true) {
Node* next = before->Next(level);
if (next != nullptr) {
PREFETCH(next->Next(level), 0, 1);
}
if (prefetch_before == true) {
if (next != nullptr && level > 0) {
PREFETCH(next->Next(level - 1), 0, 1);
}
}
assert(before == head_ || next == nullptr ||
KeyIsAfterNode(next->Key(), before));
assert(before == head_ || KeyIsAfterNode(key, before));
if (next == after || !KeyIsAfterNode(key, next)) {
// found it
*out_prev = before;
*out_next = next;
return;
}
before = next;
}
}

template <class Comparator>
void InlineSkipList<Comparator>::RecomputeSpliceLevels(const DecodedKey& key,
Splice* splice,
int recompute_level) {
assert(recompute_level > 0);
assert(recompute_level <= splice->height_);

for (int i = recompute_level - 1; i >= 0; --i) {
FindSpliceForLevel<true>(key, splice->prev_[i + 1], splice->next_[i + 1], i,
&splice->prev_[i], &splice->next_[i]);
}
}

这两个函数是 Insert 的辅助函数,用于调整 Splice。这里需要说一下 Splice 的作用。

Splice 作用是指明 SkipList 的一个范围,用各层的 prev 和 next 表示。

InlineSkipList 中的 seq_splice_ 和前面的 prev_ 一样,总是指向的是上一次插入的范围。

指明范围有两个作用,第一个是像 SkipList 一样复用,第二个是判断有无并发冲突,这一点在 Insert 哪里会有体现。

这里则是修复 Splice 的两个辅助函数。

FindSpliceForLevel 是给定一个范围,beforeafter 可以确定 key 落在第 level 层这个范围之内,然后紧缩,把范围确定在一个精准的范围,然后把这个精准的范围输出至参数 out_prevout_next

RecomputeSpliceLevels 是给定 splice 和 key,该函数会对 splice 进行修复约束(prev[i+1].key <= prev[i].key < next[i].key <=next[i+1].key)并对 key 进行紧迫。但是要求 key 必须落在 splice 的第 recompute_level 层的 prev 和 next 之中。

从上到下进行紧迫,这样可以提高效率,因为从上向下可以快速的缩小范围。

Insert

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
template <class Comparator>
template <bool UseCAS>
bool InlineSkipList<Comparator>::Insert(const char* key, Splice* splice,
bool allow_partial_splice_fix) {
Node* x = reinterpret_cast<Node*>(const_cast<char*>(key)) - 1;
const DecodedKey key_decoded = compare_.decode_key(key);
int height = x->UnstashHeight();
assert(height >= 1 && height <= kMaxHeight_);

int max_height = max_height_.LoadRelaxed();
while (height > max_height) {
// compare-and-swap(expected=max_height, desired=height)。
// 失败时会把传入的 max_height 更新为当前的原子值,因此下一轮用的是最新的
// max_height。
if (max_height_.CasWeakRelaxed(max_height, height)) {
// successfully updated it
max_height = height;
break;
}
// else retry, possibly exiting the loop because somebody else
// increased it
}
assert(max_height <= kMaxPossibleHeight);

int recompute_height = 0;
if (splice->height_ < max_height) {
// Either splice has never been used or max_height has grown since
// last use. We could potentially fix it in the latter case, but
// that is tricky.
//
// 要么这个 splice 从未被使用过,要么自上次使用以来 max_height 已经增大。
// 后者理论上可以修复,但实现起来比较棘手。
splice->prev_[max_height] = head_;
splice->next_[max_height] = nullptr;
splice->height_ = max_height;
recompute_height = max_height;
} else {
// Splice is a valid proper-height splice that brackets some
// key, but does it bracket this one? We need to validate it and
// recompute a portion of the splice (levels 0..recompute_height-1)
// that is a superset of all levels that don't bracket the new key.
// Several choices are reasonable, because we have to balance the work
// saved against the extra comparisons required to validate the Splice.
//
// One strategy is just to recompute all of orig_splice_height if the
// bottom level isn't bracketing. This pessimistically assumes that
// we will either get a perfect Splice hit (increasing sequential
// inserts) or have no locality.
//
// Another strategy is to walk up the Splice's levels until we find
// a level that brackets the key. This strategy lets the Splice
// hint help for other cases: it turns insertion from O(log N) into
// O(log D), where D is the number of nodes in between the key that
// produced the Splice and the current insert (insertion is aided
// whether the new key is before or after the splice). If you have
// a way of using a prefix of the key to map directly to the closest
// Splice out of O(sqrt(N)) Splices and we make it so that splices
// can also be used as hints during read, then we end up with Oshman's
// and Shavit's SkipTrie, which has O(log log N) lookup and insertion
// (compare to O(log N) for skip list).
//
// We control the pessimistic strategy with allow_partial_splice_fix.
// A good strategy is probably to be pessimistic for seq_splice_,
// optimistic if the caller actually went to the work of providing
// a Splice.
//
// Splice 用来括住一个 key 时,只可能变得宽松,不可能变得紧迫。因为 skiplist
// 不支持删除操作。 Splice 是一个有效且高度合适的 splice,它能括住某个 key,
// 但它能括住当前这个 key 吗?我们需要对其进行校验,
// 并重新计算 splice 的一部分(level 0 到 recompute_height-1),
// 这部分应覆盖所有不能括住新 key 的层级。
// 有几种合理的选择,因为我们需要在节省的工作量与
// 为校验 Splice 额外进行的比较之间做权衡。
//
// 一种策略是在底层(level 0)不能括住时,直接重新计算
// 原 splice 的全部高度。这种做法是悲观的,假设我们要么能
// 获得一次完美的 Splice 命中(增强顺序插入),要么没有局部性。
//
// 另一种策略是沿着 Splice 的层级向上走,直到找到一个
// 能括住该 key 的层级。该策略使 Splice 的提示在其他情况下也有帮助:
// 它将插入从 O(log N) 变为 O(log D),其中 D 是由产生该 Splice 的 key
// 到当前插入之间的节点数量(无论新 key 在 splice 之前还是之后,
// 插入都能得到帮助)。如果你能用 key 的前缀把它直接映射到
// O(sqrt(N)) 个 Splice 中最近的那个,并且我们让 splice 在读路径中
// 也能作为提示使用,那么我们就得到了 Oshman 和 Shavit 的 SkipTrie,
// 其查找与插入为 O(log log N)(相比跳表为 O(log N))。
//
// 我们用 allow_partial_splice_fix 来控制这种悲观策略。
// 一个可能较好的策略是:对 seq_splice_ 采取悲观策略;
// 如果调用方确实提供了一个 Splice,则采取乐观策略。
while (recompute_height < max_height) {
if (splice->prev_[recompute_height]->Next(recompute_height) !=
splice->next_[recompute_height]) {
// splice isn't tight at this level, there must have been some inserts
// to this
// location that didn't update the splice. We might only be a little
// stale, but if
// the splice is very stale it would be O(N) to fix it. We haven't used
// up any of
// our budget of comparisons, so always move up even if we are
// pessimistic about
// our chances of success.
//
// 因为我们每次进行操作可以认为有一个成本上限,为了减少这个成本,不能在较低层直接进行紧缩操作。
// 在这一层 splice 并不紧密,说明在这个位置发生过一些插入但没有更新
// splice。 我们可能只是一点点过期,但如果 splice
// 非常过期,修复它的代价将是 O(N)。
// 我们还没有用掉比较操作的预算,所以即使对成功几率持悲观态度,也要总是向上提升一层继续检查。
// 因为向上检查可以减少时间复杂度,
++recompute_height;
} else if (splice->prev_[recompute_height] != head_ &&
!KeyIsAfterNode(key_decoded,
splice->prev_[recompute_height])) {
// key is from before splice
if (allow_partial_splice_fix) {
// skip all levels with the same node without more comparisons
Node* bad = splice->prev_[recompute_height];
while (splice->prev_[recompute_height] == bad) {
++recompute_height;
}
} else {
// we're pessimistic, recompute everything
recompute_height = max_height;
}
} else if (KeyIsAfterNode(key_decoded, splice->next_[recompute_height])) {
// key is from after splice
if (allow_partial_splice_fix) {
Node* bad = splice->next_[recompute_height];
while (splice->next_[recompute_height] == bad) {
++recompute_height;
}
} else {
recompute_height = max_height;
}
} else {
// this level brackets the key, we won!
break;
}
}
}
assert(recompute_height <= max_height);
if (recompute_height > 0) {
RecomputeSpliceLevels(key_decoded, splice, recompute_height);
}

/*------------------start insert------------------------- */
// 现在 key 一定在 splice 之内。不过因为中间发生了插入,splice 可能变得松弛。
// 考虑这样的事情:因为 splice 不会松弛,那么不断重试即可完成并发安全的插入。
bool splice_is_valid = true;
if (UseCAS) {
for (int i = 0; i < height; ++i) {
while (true) {
// Checking for duplicate keys on the level 0 is sufficient
if (UNLIKELY(i == 0 && splice->next_[i] != nullptr &&
compare_(splice->next_[i]->Key(), key_decoded) <= 0)) {
// duplicate key
return false;
}
if (UNLIKELY(i == 0 && splice->prev_[i] != head_ &&
compare_(splice->prev_[i]->Key(), key_decoded) >= 0)) {
// duplicate key
return false;
}
assert(splice->next_[i] == nullptr ||
compare_(x->Key(), splice->next_[i]->Key()) < 0);
assert(splice->prev_[i] == head_ ||
compare_(splice->prev_[i]->Key(), x->Key()) < 0);
x->NoBarrier_SetNext(i, splice->next_[i]);
if (splice->prev_[i]->CASNext(i, splice->next_[i], x)) {
// success
break;
}
// CAS failed, we need to recompute prev and next. It is unlikely
// to be helpful to try to use a different level as we redo the
// search, because it should be unlikely that lots of nodes have
// been inserted between prev[i] and next[i]. No point in using
// next[i] as the after hint, because we know it is stale.
//
// CAS 失败了,我们需要重新计算 prev 和 next。
// 重新搜索时尝试使用不同的层级大概率没有帮助,因为在 prev[i] 和 next[i]
// 之间插入了大量节点的情况不太可能发生。 也没有必要把 next[i]
// 当作“之后”的提示,因为我们知道它已经过期了。
FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i,
&splice->prev_[i], &splice->next_[i]);

// Since we've narrowed the bracket for level i, we might have
// violated the Splice constraint between i and i-1. Make sure
// we recompute the whole thing next time.
// 由于我们在第 i 层把括住区间收紧了,可能打破了 i 与 i-1 层之间的
// Splice 约束。 确保下次把整个东西重新计算一遍。
// Splice 约束是指:prev_[i+1] <= prev_[i] < next_[i] <= next_[i+1].
if (i > 0) {
splice_is_valid = false;
}
}
}
} else {
for (int i = 0; i < height; ++i) {
if (i >= recompute_height &&
splice->prev_[i]->Next(i) != splice->next_[i]) {
FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i,
&splice->prev_[i], &splice->next_[i]);
}
// Checking for duplicate keys on the level 0 is sufficient
if (UNLIKELY(i == 0 && splice->next_[i] != nullptr &&
compare_(splice->next_[i]->Key(), key_decoded) <= 0)) {
// duplicate key
return false;
}
if (UNLIKELY(i == 0 && splice->prev_[i] != head_ &&
compare_(splice->prev_[i]->Key(), key_decoded) >= 0)) {
// duplicate key
return false;
}
assert(splice->next_[i] == nullptr ||
compare_(x->Key(), splice->next_[i]->Key()) < 0);
assert(splice->prev_[i] == head_ ||
compare_(splice->prev_[i]->Key(), x->Key()) < 0);
assert(splice->prev_[i]->Next(i) == splice->next_[i]);
x->NoBarrier_SetNext(i, splice->next_[i]);
splice->prev_[i]->SetNext(i, x);
}
}
if (splice_is_valid) {
for (int i = 0; i < height; ++i) {
splice->prev_[i] = x;
}
assert(splice->prev_[splice->height_] == head_);
assert(splice->next_[splice->height_] == nullptr);
for (int i = 0; i < splice->height_; ++i) {
assert(splice->next_[i] == nullptr ||
compare_(key, splice->next_[i]->Key()) < 0);
assert(splice->prev_[i] == head_ ||
compare_(splice->prev_[i]->Key(), key) <= 0);
assert(splice->prev_[i + 1] == splice->prev_[i] ||
splice->prev_[i + 1] == head_ ||
compare_(splice->prev_[i + 1]->Key(), splice->prev_[i]->Key()) <
0);
assert(splice->next_[i + 1] == splice->next_[i] ||
splice->next_[i + 1] == nullptr ||
compare_(splice->next_[i]->Key(), splice->next_[i + 1]->Key()) <
0);
}
} else {
splice->height_ = 0;
}
return true;
}

insert 这个函数大概分为两个部分,前半部分的 splice 校验及修正,后半部分的正式插入。

先看前半部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int max_height = max_height_.LoadRelaxed();
while (height > max_height) {
// compare-and-swap(expected=max_height, desired=height)。
// 失败时会把传入的 max_height 更新为当前的原子值,因此下一轮用的是最新的
// max_height。
if (max_height_.CasWeakRelaxed(max_height, height)) {
// successfully updated it
max_height = height;
break;
}
// else retry, possibly exiting the loop because somebody else
// increased it
}
assert(max_height <= kMaxPossibleHeight);

如果需要,则先对 max_height_ 进行修改,这是第一步。

接下来是比较难理解的是 splice 矫正,这分为两个部分,一个是检查并得到从第几层开始修正,另一个则是正式的矫正。

1
2
3
4
5
6
7
8
9
10
11
12
13
int recompute_height = 0;
if (splice->height_ < max_height) {
// Either splice has never been used or max_height has grown since
// last use. We could potentially fix it in the latter case, but
// that is tricky.
//
// 要么这个 splice 从未被使用过,要么自上次使用以来 max_height 已经增大。
// 后者理论上可以修复,但实现起来比较棘手。
splice->prev_[max_height] = head_;
splice->next_[max_height] = nullptr;
splice->height_ = max_height;
recompute_height = max_height;
}

这里处理的是 splice 的 height 较低的情况,因为这个情况较难处理,所以直接放弃进行优化,直接在下面进行矫正时,从最顶层进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
else {
// Splice is a valid proper-height splice that brackets some
// key, but does it bracket this one? We need to validate it and
// recompute a portion of the splice (levels 0..recompute_height-1)
// that is a superset of all levels that don't bracket the new key.
// Several choices are reasonable, because we have to balance the work
// saved against the extra comparisons required to validate the Splice.
//
// One strategy is just to recompute all of orig_splice_height if the
// bottom level isn't bracketing. This pessimistically assumes that
// we will either get a perfect Splice hit (increasing sequential
// inserts) or have no locality.
//
// Another strategy is to walk up the Splice's levels until we find
// a level that brackets the key. This strategy lets the Splice
// hint help for other cases: it turns insertion from O(log N) into
// O(log D), where D is the number of nodes in between the key that
// produced the Splice and the current insert (insertion is aided
// whether the new key is before or after the splice). If you have
// a way of using a prefix of the key to map directly to the closest
// Splice out of O(sqrt(N)) Splices and we make it so that splices
// can also be used as hints during read, then we end up with Oshman's
// and Shavit's SkipTrie, which has O(log log N) lookup and insertion
// (compare to O(log N) for skip list).
//
// We control the pessimistic strategy with allow_partial_splice_fix.
// A good strategy is probably to be pessimistic for seq_splice_,
// optimistic if the caller actually went to the work of providing
// a Splice.
//
// Splice 用来括住一个 key 时,只可能变得宽松,不可能变得紧迫。因为 skiplist
// 不支持删除操作。 Splice 是一个有效且高度合适的 splice,它能括住某个 key,
// 但它能括住当前这个 key 吗?我们需要对其进行校验,
// 并重新计算 splice 的一部分(level 0 到 recompute_height-1),
// 这部分应覆盖所有不能括住新 key 的层级。
// 有几种合理的选择,因为我们需要在节省的工作量与
// 为校验 Splice 额外进行的比较之间做权衡。
//
// 一种策略是在底层(level 0)不能括住时,直接重新计算
// 原 splice 的全部高度。这种做法是悲观的,假设我们要么能
// 获得一次完美的 Splice 命中(增强顺序插入),要么没有局部性。
//
// 另一种策略是沿着 Splice 的层级向上走,直到找到一个
// 能括住该 key 的层级。该策略使 Splice 的提示在其他情况下也有帮助:
// 它将插入从 O(log N) 变为 O(log D),其中 D 是由产生该 Splice 的 key
// 到当前插入之间的节点数量(无论新 key 在 splice 之前还是之后,
// 插入都能得到帮助)。如果你能用 key 的前缀把它直接映射到
// O(sqrt(N)) 个 Splice 中最近的那个,并且我们让 splice 在读路径中
// 也能作为提示使用,那么我们就得到了 Oshman 和 Shavit 的 SkipTrie,
// 其查找与插入为 O(log log N)(相比跳表为 O(log N))。
//
// 我们用 allow_partial_splice_fix 来控制这种悲观策略。
// 一个可能较好的策略是:对 seq_splice_ 采取悲观策略;
// 如果调用方确实提供了一个 Splice,则采取乐观策略。
while (recompute_height < max_height) {
if (splice->prev_[recompute_height]->Next(recompute_height) !=
splice->next_[recompute_height]) {
// splice isn't tight at this level, there must have been some inserts
// to this
// location that didn't update the splice. We might only be a little
// stale, but if
// the splice is very stale it would be O(N) to fix it. We haven't used
// up any of
// our budget of comparisons, so always move up even if we are
// pessimistic about
// our chances of success.
//
// 因为我们每次进行操作可以认为有一个成本上限,为了减少这个成本,不能在较低层直接进行紧缩操作。
// 在这一层 splice 并不紧密,说明在这个位置发生过一些插入但没有更新
// splice。 我们可能只是一点点过期,但如果 splice
// 非常过期,修复它的代价将是 O(N)。
// 我们还没有用掉比较操作的预算,所以即使对成功几率持悲观态度,也要总是向上提升一层继续检查。
// 因为向上检查可以减少时间复杂度,
++recompute_height;
} else if (splice->prev_[recompute_height] != head_ &&
!KeyIsAfterNode(key_decoded,
splice->prev_[recompute_height])) {
// key is from before splice
if (allow_partial_splice_fix) {
// skip all levels with the same node without more comparisons
Node* bad = splice->prev_[recompute_height];
while (splice->prev_[recompute_height] == bad) {
++recompute_height;
}
} else {
// we're pessimistic, recompute everything
recompute_height = max_height;
}
} else if (KeyIsAfterNode(key_decoded, splice->next_[recompute_height])) {
// key is from after splice
if (allow_partial_splice_fix) {
Node* bad = splice->next_[recompute_height];
while (splice->next_[recompute_height] == bad) {
++recompute_height;
}
} else {
recompute_height = max_height;
}
} else {
// this level brackets the key, we won!
break;
}
}
}

这里进行的操作是得到需要修正的所有层级,如果某一层完全套住了 key,那么就可以终止循环,因为 splice 遵从约束 prev[i+1].key <= prev[i].key < next[i].key <=next[i+1].key。

循环中有一组 if else。

第一个 if (splice->prev_[recompute_height]->Next(recompute_height) !=splice->next_[recompute_height]) 表示 splice 中间插入了其他值,可能是 splice 过期了,所以需要进行修复,直接 ++recompute_height。

第二个 else if (splice->prev_[recompute_height] != head_ &&!KeyIsAfterNode(key_decoded,splice->prev_[recompute_height])) 表示 key 在 splice 左边,需要修复。

第三个与第二个逻辑一致,表示 key 在 splice 右边,需要修复。

第四个则表示 key 完全落入 splice 中间,则可以推出循环,不需要继续修复。

1
2
3
4
assert(recompute_height <= max_height);
if (recompute_height > 0) {
RecomputeSpliceLevels(key_decoded, splice, recompute_height);
}

通过调用 RecomputeSpliceLevels 即可对 splice 进行修复,其逻辑与上面的描述一致。

其实这里修复是要求 splice 的 recompute_height 层以及之下都能紧迫的套住 key。

而插入在这之后发生。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
bool splice_is_valid = true;
if (UseCAS) {
for (int i = 0; i < height; ++i) {
while (true) {
// Checking for duplicate keys on the level 0 is sufficient
if (UNLIKELY(i == 0 && splice->next_[i] != nullptr &&
compare_(splice->next_[i]->Key(), key_decoded) <= 0)) {
// duplicate key
return false;
}
if (UNLIKELY(i == 0 && splice->prev_[i] != head_ &&
compare_(splice->prev_[i]->Key(), key_decoded) >= 0)) {
// duplicate key
return false;
}
assert(splice->next_[i] == nullptr ||
compare_(x->Key(), splice->next_[i]->Key()) < 0);
assert(splice->prev_[i] == head_ ||
compare_(splice->prev_[i]->Key(), x->Key()) < 0);
x->NoBarrier_SetNext(i, splice->next_[i]);
if (splice->prev_[i]->CASNext(i, splice->next_[i], x)) {
// success
break;
}
// CAS failed, we need to recompute prev and next. It is unlikely
// to be helpful to try to use a different level as we redo the
// search, because it should be unlikely that lots of nodes have
// been inserted between prev[i] and next[i]. No point in using
// next[i] as the after hint, because we know it is stale.
//
// CAS 失败了,我们需要重新计算 prev 和 next。
// 重新搜索时尝试使用不同的层级大概率没有帮助,因为在 prev[i] 和 next[i]
// 之间插入了大量节点的情况不太可能发生。 也没有必要把 next[i]
// 当作“之后”的提示,因为我们知道它已经过期了。
FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i,
&splice->prev_[i], &splice->next_[i]);

// Since we've narrowed the bracket for level i, we might have
// violated the Splice constraint between i and i-1. Make sure
// we recompute the whole thing next time.
// 由于我们在第 i 层把括住区间收紧了,可能打破了 i 与 i-1 层之间的
// Splice 约束。 确保下次把整个东西重新计算一遍。
// Splice 约束是指:prev_[i+1] <= prev_[i] < next_[i] <= next_[i+1].
if (i > 0) {
splice_is_valid = false;
}
}
}
}

先看一下支持并发插入的部分,这个部分假设 splice 是紧迫地套住 key 左右的,不过仍然会进行检查。

其实不难理解,指针修改从下到上可以尽快发现是否存在重复键,因为重复键需要返回失败,而已经修改的指针没办法安全的还原,所以必须从下到上修改。

于是第一步就是进行 key 的检查,是否存在重复键。

虽然先对插入的节点修改指针,最后使用 CAS 原子修改 prev 节点的指针指向。

如果 CAS 失败,说明可能存在其他线程并发的进行了插入,所以需要重新调用 FindSpliceForLevel 对 splice 进行重新紧迫,并进行下一次尝试。

splice_is_valid 用来指示是否 splice 是否在插入时进行了修改,因为如果插入时进行修改,可能会导致 splice 的约束遭到了破坏。

非并发插入部分的代码和并发插入部分的逻辑基本一致。

可以看到 splice 在插入过程中的作用,通过锁定范围,可以用来检查并发的冲突,非常的巧妙。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if (splice_is_valid) {
for (int i = 0; i < height; ++i) {
splice->prev_[i] = x;
}
assert(splice->prev_[splice->height_] == head_);
assert(splice->next_[splice->height_] == nullptr);
for (int i = 0; i < splice->height_; ++i) {
assert(splice->next_[i] == nullptr ||
compare_(key, splice->next_[i]->Key()) < 0);
assert(splice->prev_[i] == head_ ||
compare_(splice->prev_[i]->Key(), key) <= 0);
assert(splice->prev_[i + 1] == splice->prev_[i] ||
splice->prev_[i + 1] == head_ ||
compare_(splice->prev_[i + 1]->Key(), splice->prev_[i]->Key()) <
0);
assert(splice->next_[i + 1] == splice->next_[i] ||
splice->next_[i + 1] == nullptr ||
compare_(splice->next_[i]->Key(), splice->next_[i + 1]->Key()) <
0);
}
} else {
splice->height_ = 0;
}

最后是收尾工作,对 splice 进行检查和保存当此结果。

如果 splice_is_valid == true,先修改 splice 的 prev,修改为指向插入的节点,保存当次结果,最后是进行一些检查,如果符合升序和非空。

如果 splice_is_valid == false,修改 splice 的 height 为 0 相当于标记了 splice 为一个无用值,在插入前会对 splice 进行检查 if (splice->height_ < max_height) 也有这个原因。

memtable 的方法参数太多了,看得有点头大,等看完了再做个总结。