// Immutable after construction Comparator const compare_; Allocator* const allocator_; // Allocator used for allocations of nodes
Node* const head_;
// Modified only by Insert(). Read racily by readers, but stale // values are ok. RelaxedAtomic<int> max_height_; // Height of the entire list
// Used for optimizing sequential insert patterns. Tricky. prev_[i] for // i up to max_height_ is the predecessor of prev_[0] and prev_height_ // is the height of prev_[0]. prev_[0] can only be equal to head before // insertion, in which case max_height_ and prev_height_ are 1. // prev_ 用于优化顺序插入模式。 // 对于 up to max_height_ 的 i,prev_[i] 是 prev_[0] 的前驱。 // prev_height_ 是 prev_[0] 的高度。 // prev_[0] 只能在插入之前等于 head_,在这种情况下 max_height_ 和 prev_height_ 都是 1。 int32_t prev_height_; Node** prev_;
template <typename Key, classComparator> inlinevoid SkipList<Key, Comparator>::Iterator::Prev() { // Instead of using explicit "prev" links, we just search for the // last node that falls before key. assert(Valid()); node_ = list_->FindLessThan(node_->key); if (node_ == list_->head_) { node_ = nullptr; } }
// Outside of this method prev_[1..max_height_] is the predecessor // of prev_[0], and prev_height_ refers to prev_[0]. Inside Insert // prev_[0..max_height - 1] is the predecessor of key. Switch from // the external state to the internal // 在此方法外部,prev_[1..max_height_] 是 prev_[0] 的前身 // prev_height_ 指的是 prev_[0]。在方法内部插入 // prev_[0..max_height - 1] 是 key 的前身。从 // 外部状态切换到内部状态 for (int i = 1; i < prev_height_; i++) { prev_[i] = prev_[0]; } } else { // TODO(opt): we could use a NoBarrier predecessor search as an // optimization for architectures where memory_order_acquire needs // a synchronization instruction. Doesn't matter on x86
// Our data structure does not allow duplicate insertion assert(prev_[0]->Next(0) == nullptr || !Equal(key, prev_[0]->Next(0)->key));
int height = RandomHeight(); if (height > GetMaxHeight()) { for (int i = GetMaxHeight(); i < height; i++) { prev_[i] = head_; } // fprintf(stderr, "Change height from %d to %d\n", max_height_, height);
// It is ok to mutate max_height_ without any synchronization // with concurrent readers. A concurrent reader that observes // the new value of max_height_ will see either the old value of // new level pointers from head_ (nullptr), or a new value set in // the loop below. In the former case the reader will // immediately drop to the next level since nullptr sorts after all // keys. In the latter case the reader will use the new node. // 无需任何同步即可修改 max_height_, // 并发读取器也是如此。观察 max_height_ 的新值的并发读取器将看到以下两种值: // 来自 head_ 的新层级指针的旧值(nullptr),或在下面的循环中设置的新值。 // 在前一种情况下,读取器将 // 立即下降到下一层级,因为 nullptr 在所有键之后排序。 // 在后一种情况下,读取器将使用新节点。 max_height_.StoreRelaxed(height); }
Node* x = NewNode(key, height); for (int i = 0; i < height; i++) { // NoBarrier_SetNext() suffices since we will add a barrier when // we publish a pointer to "x" in prev[i]. x->NoBarrier_SetNext(i, prev_[i]->NoBarrier_Next(i)); prev_[i]->SetNext(i, x); } prev_[0] = x; prev_height_ = height; }
Allocator* const allocator_; // Allocator used for allocations of nodes // Immutable after construction Comparator const compare_; Node* const head_;
// Maximum height of any node in the list (or in the process of being added). // Modified only by Insert(). Relaxed reads are always OK because starting // from higher levels only helps efficiency, not correctness. RelaxedAtomic<int> max_height_;
// seq_splice_ is a Splice used for insertions in the non-concurrent // case. It caches the prev and next found during the most recent // non-concurrent insertion. // // seq_splice_ 是一个 Splice 用于非并发插入。 // 它缓存了最近一次非并发插入的 prev 和 next。 Splice* seq_splice_;
private: // next_[0] is the lowest level link (level 0). Higher levels are // stored _earlier_, so level 1 is at next_[-1]. AcqRelAtomic<Node*> next_[1];
// Stores the height of the node in the memory location normally used for // next_[0]. This is used for passing data from AllocateKey to Insert. voidStashHeight(constint height){ static_assert(sizeof(int) <= sizeof(next_[0])); memcpy(static_cast<void*>(&next_[0]), &height, sizeof(int)); }
// Retrieves the value passed to StashHeight. Undefined after a call // to SetNext or NoBarrier_SetNext. intUnstashHeight()const{ int rv; memcpy(&rv, &next_[0], sizeof(int)); return rv; }
// The invariant of a Splice is that prev_[i+1].key <= prev_[i].key < // next_[i].key <= next_[i+1].key for all i. That means that if a // key is bracketed by prev_[i] and next_[i] then it is bracketed by // all higher levels. It is _not_ required that prev_[i]->Next(i) == // next_[i] (it probably did at some point in the past, but intervening // or concurrent operations might have inserted nodes in between). // int height_ = 0; Node** prev_; Node** next_;
template <classComparator> inlinevoid InlineSkipList<Comparator>::Iterator::Prev() { // Instead of using explicit "prev" links, we just search for the // last node that falls before key. assert(Valid()); node_ = list_->FindLessThan(node_->Key(), nullptr); if (node_ == list_->head_) { node_ = nullptr; } }
// We start at the max level. // FOr each level, we look at all the nodes at the level, and // we randomly pick one of them. Then decrement the level // and reiterate the process. // eg: assume GetMaxHeight()=5, and there are #100 elements (nodes). // level 4 nodes: lvl_nodes={#1, #15, #67, #84}. Randomly pick #15. // We will consider all the nodes between #15 (inclusive) and #67 // (exclusive). #67 is called 'limit_node' here. // level 3 nodes: lvl_nodes={#15, #21, #45, #51}. Randomly choose // #51. #67 remains 'limit_node'. // [...] // level 0 nodes: lvl_nodes={#56,#57,#58,#59}. Randomly pick $57. // Return Node #57. // std::vector<Node*> lvl_nodes; Random* rnd = Random::GetTLSInstance(); int level = GetMaxHeight() - 1;
while (level >= 0) { lvl_nodes.clear(); scan_node = x; while (scan_node != limit_node) { lvl_nodes.push_back(scan_node); scan_node = scan_node->Next(level); } uint32_t rnd_idx = rnd->Next() % lvl_nodes.size(); x = lvl_nodes[rnd_idx]; if (rnd_idx + 1 < lvl_nodes.size()) { limit_node = lvl_nodes[rnd_idx + 1]; } level--; } // There is a special case where x could still be the head_ // (note that the head_ contains no key). return x == head_ && head_ != nullptr ? head_->Next(0) : x; }
int max_height = max_height_.LoadRelaxed(); while (height > max_height) { // compare-and-swap(expected=max_height, desired=height)。 // 失败时会把传入的 max_height 更新为当前的原子值,因此下一轮用的是最新的 // max_height。 if (max_height_.CasWeakRelaxed(max_height, height)) { // successfully updated it max_height = height; break; } // else retry, possibly exiting the loop because somebody else // increased it } assert(max_height <= kMaxPossibleHeight);
int recompute_height = 0; if (splice->height_ < max_height) { // Either splice has never been used or max_height has grown since // last use. We could potentially fix it in the latter case, but // that is tricky. // // 要么这个 splice 从未被使用过,要么自上次使用以来 max_height 已经增大。 // 后者理论上可以修复,但实现起来比较棘手。 splice->prev_[max_height] = head_; splice->next_[max_height] = nullptr; splice->height_ = max_height; recompute_height = max_height; } else { // Splice is a valid proper-height splice that brackets some // key, but does it bracket this one? We need to validate it and // recompute a portion of the splice (levels 0..recompute_height-1) // that is a superset of all levels that don't bracket the new key. // Several choices are reasonable, because we have to balance the work // saved against the extra comparisons required to validate the Splice. // // One strategy is just to recompute all of orig_splice_height if the // bottom level isn't bracketing. This pessimistically assumes that // we will either get a perfect Splice hit (increasing sequential // inserts) or have no locality. // // Another strategy is to walk up the Splice's levels until we find // a level that brackets the key. This strategy lets the Splice // hint help for other cases: it turns insertion from O(log N) into // O(log D), where D is the number of nodes in between the key that // produced the Splice and the current insert (insertion is aided // whether the new key is before or after the splice). If you have // a way of using a prefix of the key to map directly to the closest // Splice out of O(sqrt(N)) Splices and we make it so that splices // can also be used as hints during read, then we end up with Oshman's // and Shavit's SkipTrie, which has O(log log N) lookup and insertion // (compare to O(log N) for skip list). // // We control the pessimistic strategy with allow_partial_splice_fix. // A good strategy is probably to be pessimistic for seq_splice_, // optimistic if the caller actually went to the work of providing // a Splice. // // Splice 用来括住一个 key 时,只可能变得宽松,不可能变得紧迫。因为 skiplist // 不支持删除操作。 Splice 是一个有效且高度合适的 splice,它能括住某个 key, // 但它能括住当前这个 key 吗?我们需要对其进行校验, // 并重新计算 splice 的一部分(level 0 到 recompute_height-1), // 这部分应覆盖所有不能括住新 key 的层级。 // 有几种合理的选择,因为我们需要在节省的工作量与 // 为校验 Splice 额外进行的比较之间做权衡。 // // 一种策略是在底层(level 0)不能括住时,直接重新计算 // 原 splice 的全部高度。这种做法是悲观的,假设我们要么能 // 获得一次完美的 Splice 命中(增强顺序插入),要么没有局部性。 // // 另一种策略是沿着 Splice 的层级向上走,直到找到一个 // 能括住该 key 的层级。该策略使 Splice 的提示在其他情况下也有帮助: // 它将插入从 O(log N) 变为 O(log D),其中 D 是由产生该 Splice 的 key // 到当前插入之间的节点数量(无论新 key 在 splice 之前还是之后, // 插入都能得到帮助)。如果你能用 key 的前缀把它直接映射到 // O(sqrt(N)) 个 Splice 中最近的那个,并且我们让 splice 在读路径中 // 也能作为提示使用,那么我们就得到了 Oshman 和 Shavit 的 SkipTrie, // 其查找与插入为 O(log log N)(相比跳表为 O(log N))。 // // 我们用 allow_partial_splice_fix 来控制这种悲观策略。 // 一个可能较好的策略是:对 seq_splice_ 采取悲观策略; // 如果调用方确实提供了一个 Splice,则采取乐观策略。 while (recompute_height < max_height) { if (splice->prev_[recompute_height]->Next(recompute_height) != splice->next_[recompute_height]) { // splice isn't tight at this level, there must have been some inserts // to this // location that didn't update the splice. We might only be a little // stale, but if // the splice is very stale it would be O(N) to fix it. We haven't used // up any of // our budget of comparisons, so always move up even if we are // pessimistic about // our chances of success. // // 因为我们每次进行操作可以认为有一个成本上限,为了减少这个成本,不能在较低层直接进行紧缩操作。 // 在这一层 splice 并不紧密,说明在这个位置发生过一些插入但没有更新 // splice。 我们可能只是一点点过期,但如果 splice // 非常过期,修复它的代价将是 O(N)。 // 我们还没有用掉比较操作的预算,所以即使对成功几率持悲观态度,也要总是向上提升一层继续检查。 // 因为向上检查可以减少时间复杂度, ++recompute_height; } elseif (splice->prev_[recompute_height] != head_ && !KeyIsAfterNode(key_decoded, splice->prev_[recompute_height])) { // key is from before splice if (allow_partial_splice_fix) { // skip all levels with the same node without more comparisons Node* bad = splice->prev_[recompute_height]; while (splice->prev_[recompute_height] == bad) { ++recompute_height; } } else { // we're pessimistic, recompute everything recompute_height = max_height; } } elseif (KeyIsAfterNode(key_decoded, splice->next_[recompute_height])) { // key is from after splice if (allow_partial_splice_fix) { Node* bad = splice->next_[recompute_height]; while (splice->next_[recompute_height] == bad) { ++recompute_height; } } else { recompute_height = max_height; } } else { // this level brackets the key, we won! break; } } } assert(recompute_height <= max_height); if (recompute_height > 0) { RecomputeSpliceLevels(key_decoded, splice, recompute_height); }
/*------------------start insert------------------------- */ // 现在 key 一定在 splice 之内。不过因为中间发生了插入,splice 可能变得松弛。 // 考虑这样的事情:因为 splice 不会松弛,那么不断重试即可完成并发安全的插入。 bool splice_is_valid = true; if (UseCAS) { for (int i = 0; i < height; ++i) { while (true) { // Checking for duplicate keys on the level 0 is sufficient if (UNLIKELY(i == 0 && splice->next_[i] != nullptr && compare_(splice->next_[i]->Key(), key_decoded) <= 0)) { // duplicate key returnfalse; } if (UNLIKELY(i == 0 && splice->prev_[i] != head_ && compare_(splice->prev_[i]->Key(), key_decoded) >= 0)) { // duplicate key returnfalse; } assert(splice->next_[i] == nullptr || compare_(x->Key(), splice->next_[i]->Key()) < 0); assert(splice->prev_[i] == head_ || compare_(splice->prev_[i]->Key(), x->Key()) < 0); x->NoBarrier_SetNext(i, splice->next_[i]); if (splice->prev_[i]->CASNext(i, splice->next_[i], x)) { // success break; } // CAS failed, we need to recompute prev and next. It is unlikely // to be helpful to try to use a different level as we redo the // search, because it should be unlikely that lots of nodes have // been inserted between prev[i] and next[i]. No point in using // next[i] as the after hint, because we know it is stale. // // CAS 失败了,我们需要重新计算 prev 和 next。 // 重新搜索时尝试使用不同的层级大概率没有帮助,因为在 prev[i] 和 next[i] // 之间插入了大量节点的情况不太可能发生。 也没有必要把 next[i] // 当作“之后”的提示,因为我们知道它已经过期了。 FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i, &splice->prev_[i], &splice->next_[i]);
// Since we've narrowed the bracket for level i, we might have // violated the Splice constraint between i and i-1. Make sure // we recompute the whole thing next time. // 由于我们在第 i 层把括住区间收紧了,可能打破了 i 与 i-1 层之间的 // Splice 约束。 确保下次把整个东西重新计算一遍。 // Splice 约束是指:prev_[i+1] <= prev_[i] < next_[i] <= next_[i+1]. if (i > 0) { splice_is_valid = false; } } } } else { for (int i = 0; i < height; ++i) { if (i >= recompute_height && splice->prev_[i]->Next(i) != splice->next_[i]) { FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i, &splice->prev_[i], &splice->next_[i]); } // Checking for duplicate keys on the level 0 is sufficient if (UNLIKELY(i == 0 && splice->next_[i] != nullptr && compare_(splice->next_[i]->Key(), key_decoded) <= 0)) { // duplicate key returnfalse; } if (UNLIKELY(i == 0 && splice->prev_[i] != head_ && compare_(splice->prev_[i]->Key(), key_decoded) >= 0)) { // duplicate key returnfalse; } assert(splice->next_[i] == nullptr || compare_(x->Key(), splice->next_[i]->Key()) < 0); assert(splice->prev_[i] == head_ || compare_(splice->prev_[i]->Key(), x->Key()) < 0); assert(splice->prev_[i]->Next(i) == splice->next_[i]); x->NoBarrier_SetNext(i, splice->next_[i]); splice->prev_[i]->SetNext(i, x); } } if (splice_is_valid) { for (int i = 0; i < height; ++i) { splice->prev_[i] = x; } assert(splice->prev_[splice->height_] == head_); assert(splice->next_[splice->height_] == nullptr); for (int i = 0; i < splice->height_; ++i) { assert(splice->next_[i] == nullptr || compare_(key, splice->next_[i]->Key()) < 0); assert(splice->prev_[i] == head_ || compare_(splice->prev_[i]->Key(), key) <= 0); assert(splice->prev_[i + 1] == splice->prev_[i] || splice->prev_[i + 1] == head_ || compare_(splice->prev_[i + 1]->Key(), splice->prev_[i]->Key()) < 0); assert(splice->next_[i + 1] == splice->next_[i] || splice->next_[i + 1] == nullptr || compare_(splice->next_[i]->Key(), splice->next_[i + 1]->Key()) < 0); } } else { splice->height_ = 0; } returntrue; }
insert 这个函数大概分为两个部分,前半部分的 splice 校验及修正,后半部分的正式插入。
先看前半部分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
int max_height = max_height_.LoadRelaxed(); while (height > max_height) { // compare-and-swap(expected=max_height, desired=height)。 // 失败时会把传入的 max_height 更新为当前的原子值,因此下一轮用的是最新的 // max_height。 if (max_height_.CasWeakRelaxed(max_height, height)) { // successfully updated it max_height = height; break; } // else retry, possibly exiting the loop because somebody else // increased it } assert(max_height <= kMaxPossibleHeight);
int recompute_height = 0; if (splice->height_ < max_height) { // Either splice has never been used or max_height has grown since // last use. We could potentially fix it in the latter case, but // that is tricky. // // 要么这个 splice 从未被使用过,要么自上次使用以来 max_height 已经增大。 // 后者理论上可以修复,但实现起来比较棘手。 splice->prev_[max_height] = head_; splice->next_[max_height] = nullptr; splice->height_ = max_height; recompute_height = max_height; }
else { // Splice is a valid proper-height splice that brackets some // key, but does it bracket this one? We need to validate it and // recompute a portion of the splice (levels 0..recompute_height-1) // that is a superset of all levels that don't bracket the new key. // Several choices are reasonable, because we have to balance the work // saved against the extra comparisons required to validate the Splice. // // One strategy is just to recompute all of orig_splice_height if the // bottom level isn't bracketing. This pessimistically assumes that // we will either get a perfect Splice hit (increasing sequential // inserts) or have no locality. // // Another strategy is to walk up the Splice's levels until we find // a level that brackets the key. This strategy lets the Splice // hint help for other cases: it turns insertion from O(log N) into // O(log D), where D is the number of nodes in between the key that // produced the Splice and the current insert (insertion is aided // whether the new key is before or after the splice). If you have // a way of using a prefix of the key to map directly to the closest // Splice out of O(sqrt(N)) Splices and we make it so that splices // can also be used as hints during read, then we end up with Oshman's // and Shavit's SkipTrie, which has O(log log N) lookup and insertion // (compare to O(log N) for skip list). // // We control the pessimistic strategy with allow_partial_splice_fix. // A good strategy is probably to be pessimistic for seq_splice_, // optimistic if the caller actually went to the work of providing // a Splice. // // Splice 用来括住一个 key 时,只可能变得宽松,不可能变得紧迫。因为 skiplist // 不支持删除操作。 Splice 是一个有效且高度合适的 splice,它能括住某个 key, // 但它能括住当前这个 key 吗?我们需要对其进行校验, // 并重新计算 splice 的一部分(level 0 到 recompute_height-1), // 这部分应覆盖所有不能括住新 key 的层级。 // 有几种合理的选择,因为我们需要在节省的工作量与 // 为校验 Splice 额外进行的比较之间做权衡。 // // 一种策略是在底层(level 0)不能括住时,直接重新计算 // 原 splice 的全部高度。这种做法是悲观的,假设我们要么能 // 获得一次完美的 Splice 命中(增强顺序插入),要么没有局部性。 // // 另一种策略是沿着 Splice 的层级向上走,直到找到一个 // 能括住该 key 的层级。该策略使 Splice 的提示在其他情况下也有帮助: // 它将插入从 O(log N) 变为 O(log D),其中 D 是由产生该 Splice 的 key // 到当前插入之间的节点数量(无论新 key 在 splice 之前还是之后, // 插入都能得到帮助)。如果你能用 key 的前缀把它直接映射到 // O(sqrt(N)) 个 Splice 中最近的那个,并且我们让 splice 在读路径中 // 也能作为提示使用,那么我们就得到了 Oshman 和 Shavit 的 SkipTrie, // 其查找与插入为 O(log log N)(相比跳表为 O(log N))。 // // 我们用 allow_partial_splice_fix 来控制这种悲观策略。 // 一个可能较好的策略是:对 seq_splice_ 采取悲观策略; // 如果调用方确实提供了一个 Splice,则采取乐观策略。 while (recompute_height < max_height) { if (splice->prev_[recompute_height]->Next(recompute_height) != splice->next_[recompute_height]) { // splice isn't tight at this level, there must have been some inserts // to this // location that didn't update the splice. We might only be a little // stale, but if // the splice is very stale it would be O(N) to fix it. We haven't used // up any of // our budget of comparisons, so always move up even if we are // pessimistic about // our chances of success. // // 因为我们每次进行操作可以认为有一个成本上限,为了减少这个成本,不能在较低层直接进行紧缩操作。 // 在这一层 splice 并不紧密,说明在这个位置发生过一些插入但没有更新 // splice。 我们可能只是一点点过期,但如果 splice // 非常过期,修复它的代价将是 O(N)。 // 我们还没有用掉比较操作的预算,所以即使对成功几率持悲观态度,也要总是向上提升一层继续检查。 // 因为向上检查可以减少时间复杂度, ++recompute_height; } elseif (splice->prev_[recompute_height] != head_ && !KeyIsAfterNode(key_decoded, splice->prev_[recompute_height])) { // key is from before splice if (allow_partial_splice_fix) { // skip all levels with the same node without more comparisons Node* bad = splice->prev_[recompute_height]; while (splice->prev_[recompute_height] == bad) { ++recompute_height; } } else { // we're pessimistic, recompute everything recompute_height = max_height; } } elseif (KeyIsAfterNode(key_decoded, splice->next_[recompute_height])) { // key is from after splice if (allow_partial_splice_fix) { Node* bad = splice->next_[recompute_height]; while (splice->next_[recompute_height] == bad) { ++recompute_height; } } else { recompute_height = max_height; } } else { // this level brackets the key, we won! break; } } }
bool splice_is_valid = true; if (UseCAS) { for (int i = 0; i < height; ++i) { while (true) { // Checking for duplicate keys on the level 0 is sufficient if (UNLIKELY(i == 0 && splice->next_[i] != nullptr && compare_(splice->next_[i]->Key(), key_decoded) <= 0)) { // duplicate key returnfalse; } if (UNLIKELY(i == 0 && splice->prev_[i] != head_ && compare_(splice->prev_[i]->Key(), key_decoded) >= 0)) { // duplicate key returnfalse; } assert(splice->next_[i] == nullptr || compare_(x->Key(), splice->next_[i]->Key()) < 0); assert(splice->prev_[i] == head_ || compare_(splice->prev_[i]->Key(), x->Key()) < 0); x->NoBarrier_SetNext(i, splice->next_[i]); if (splice->prev_[i]->CASNext(i, splice->next_[i], x)) { // success break; } // CAS failed, we need to recompute prev and next. It is unlikely // to be helpful to try to use a different level as we redo the // search, because it should be unlikely that lots of nodes have // been inserted between prev[i] and next[i]. No point in using // next[i] as the after hint, because we know it is stale. // // CAS 失败了,我们需要重新计算 prev 和 next。 // 重新搜索时尝试使用不同的层级大概率没有帮助,因为在 prev[i] 和 next[i] // 之间插入了大量节点的情况不太可能发生。 也没有必要把 next[i] // 当作“之后”的提示,因为我们知道它已经过期了。 FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i, &splice->prev_[i], &splice->next_[i]);
// Since we've narrowed the bracket for level i, we might have // violated the Splice constraint between i and i-1. Make sure // we recompute the whole thing next time. // 由于我们在第 i 层把括住区间收紧了,可能打破了 i 与 i-1 层之间的 // Splice 约束。 确保下次把整个东西重新计算一遍。 // Splice 约束是指:prev_[i+1] <= prev_[i] < next_[i] <= next_[i+1]. if (i > 0) { splice_is_valid = false; } } } }