DogDu's blog

Rocksdb:skiplist

面试时被问到 skiplist 怎么做并发控制,答的不是很好,所以来学习一下。

SkipList

代码文件在 memtable/skiplist.h ,这个版本的 skiplist 支持多读单写的无锁并发,如果需要多写的并发安全,则需要外部进行互斥,目前这个版本的跳表似乎已经被弃置。

结构

重要的结构是 class SkipListstruct SkipList::Nodeclass SkipList::Iterator

class SkipList

class SkipList 成员变量如下:

const uint16_t kMaxHeight_;
const uint16_t kBranching_;
const uint32_t kScaledInverseBranching_;

// Immutable after construction
Comparator const compare_;
Allocator* const allocator_;  // Allocator used for allocations of nodes
Node* const head_;            // Modified only by Insert().

// Read racily by readers, but stale values are ok.
RelaxedAtomic<int> max_height_;  // Height of the entire list

// Used for optimizing sequential insert patterns. Tricky.
// prev_[i] for i up to max_height_ is the predecessor of prev_[0] and
// prev_height_ is the height of prev_[0].
// prev_[0] can only be equal to head_ before insertion, in which case
// max_height_ and prev_height_ are 1.
// prev_ 用于优化顺序插入模式。
// 对于 up to max_height_ 的 i,prev_[i] 是 prev_[0] 的前驱。
// prev_height_ 是 prev_[0] 的高度。
// prev_[0] 只能在插入之前等于 head_,在这种情况下 max_height_ 和
// prev_height_ 都是 1。
int32_t prev_height_;
Node** prev_;

非常好理解的成员变量,可能比较疑惑的是 prev_height_prev_ ,这两个是用来记录上一次插入的位置,用以优化顺序插入。

struct SkipList::Node

struct SkipList::Node 成员变量如下:

public:
  Key const key;

private:
  // Array of length equal to the node height.
  // next_[0] is lowest level link.
  AcqRelAtomic<Node*> next_[1];

key 就是用来存储的键,显然在这里 Key 应该是一个指向数据内存的指针。

接下来的 next_[1] 是一个常见的 struct hack,在支持柔性数组的 C99 中可以直接写成 next_[],但 C++ 并不支持柔性数组,所以写成这样。

当然这么进行内存布局的话,就要求这个结构体不能以数组在内存紧密相挨,因为它实际的内存大小大于编译器以为的。

看一下 NewNode 方法就知道其用意了。

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::NewNode(const Key& key, int height) {
  char* mem = allocator_->AllocateAligned(
      sizeof(Node) + sizeof(AcqRelAtomic<Node*>) * (height - 1));
  return new (mem) Node(key);
}

不过,从这个函数中可以留意到,AcqRelAtomic<Node *> 必须是一个简单的类型,能够支持这种操作。

那么它可以嘛?肯定是可以的。

AcqRelAtomic<Node *> 继承于 RelaxedAtomic<Node *>,其类型本身没有定义任何成员变量。

RelaxedAtomic<Node *> 内部只有一个 std::atomic<Node *> 的成员变量。

std::atomic<Node *> 类型中只有一个定义如下的成员变量:

// Align 1/2/4/8/16-byte types to at least their size.      static constexpr int _S_min_alignment = (sizeof(_Tp) & (sizeof(_Tp) - 1)) || sizeof(_Tp) > 16 ? 0 : sizeof(_Tp);       static constexpr int _S_alignment        = _S_min_alignment > alignof(_Tp) ? _S_min_alignment : alignof(_Tp);       alignas(_S_alignment) _Tp _M_i _GLIBCXX20_INIT(_Tp());

对应到这里,也就是内部只有一个指针变量,同时是内存对齐的。可以供随意摆弄。

class SkipList::Iterator

class SkipList::Iterator 成员变量如下:

const SkipList* list_;
Node* node_;

很简单。

方法

因为跳表实现本身不难,比较有趣的是其并发实现,和部分方法的实现。

Next, Prev

template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Next() {
  assert(Valid());
  node_ = node_->Next(0);
}

template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Prev() {
  // Instead of using explicit "prev" links, we just search for the
  // last node that falls before key.
  assert(Valid());
  node_ = list_->FindLessThan(node_->key);
  if (node_ == list_->head_) {
    node_ = nullptr;
  }
}

对比迭代器这两个方法,可以发现,迭代器如果反向迭代,那么只会重新搜索,而不会像正向迭代那样直接修改指针指向下一个节点。

这是因为实现中 SkipList::Node 没有存储反向的指针,只存储了正向的指针。

我想这可能跟并发的实现相关,因为如果存储两个指针的话,在插入一个节点的时候,每层需要修改四个指针,新节点尚好,但在新节点进行发布 (release) 的时候,是没办法同时修改前后两个节点的两个指针的,这可能会让读者读到中间状态而困惑。

比如: prev <-> node <-> next

node 把自己的指针的 prev 和 next 已经做好了修改,然后修改 prev 和 next 其中之一,假设先修改 prev 再修改 next;那么在修改 prev 后,修改 next 前,读者可能会读到 prev->next != next->prev;

FindLessThan

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key, Node** prev) const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  // KeyIsAfter(key, last_not_after) is definitely false
  Node* last_not_after = nullptr;
  while (true) {
    assert(x != nullptr);
    Node* next = x->Next(level);
    // 断言1:表示排序
    assert(x == head_ || next == nullptr || KeyIsAfterNode(next->key, x));
    // 断言2:没有超过
    assert(x == head_ || KeyIsAfterNode(key, x));
    if (next != last_not_after && KeyIsAfterNode(key, next)) {
      // Keep searching in this list
      x = next;
    } else {
      if (prev != nullptr) {
        prev[level] = x;
      }
      if (level == 0) {
        return x;
      } else {
        // Switch to next list, reuse KeyIUsAfterNode() result
        last_not_after = next;
        level--;
      }
    }
  }
}

FindLessThan 用于查找最后一个 key < key。同时会填充 prev 参数表示各层级的 prev 节点。

在这里每一次循环代表层内步进一次(if (next != last_not_after && KeyIsAfterNode(*key*, next))),或者层间步进一次(else)

写的让我比较疑惑,如果让我写,我肯定会写成一个双重循环,外层代表层级循环,内存代表层内循环。

就像下面这样:

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key, Node** prev) const {
  Node* x = head_;
  Node* last_not_after = nullptr;
  for (int level = GetMaxHeight() - 1; level >= 0; --level) {
    Node* next = x->Next(level);
    while (next != last_not_after && KeyIsAfterNode(key, next)) {
      x = next;
      next = x->Next(level);
    }

    if (prev != nullptr) {
      prev[level] = x;
    }

    last_not_after = next;
  }
  return x;
}

没有选择使用双重循环可能有效率上的考量,可能可以减少一定的分支判断,也可能只是编写者个人习惯。

Insert

template <class Comparator>
bool InlineSkipList<Comparator>::Insert(const char* key) {
  return Insert<false>(key, seq_splice_, false);
}

template <class Comparator>
template <bool UseCAS>
bool InlineSkipList<Comparator>::Insert(const char* key, Splice* splice,
                                        bool allow_partial_splice_fix) {
  Node* x = reinterpret_cast<Node*>(const_cast<char*>(key)) - 1;
  const DecodedKey key_decoded = compare_.decode_key(key);
  int height = x->UnstashHeight();
  assert(height >= 1 && height <= kMaxHeight_);

  int max_height = max_height_.LoadRelaxed();
  while (height > max_height) {
    // compare-and-swap(expected=max_height, desired=height)。
    // 失败时会把传入的 max_height 更新为当前的原子值,因此下一轮用的是最新的
    // max_height。
    if (max_height_.CasWeakRelaxed(max_height, height)) {
      // successfully updated it
      max_height = height;
      break;
    }
    // else retry, possibly exiting the loop because somebody else
    // increased it
  }
  assert(max_height <= kMaxPossibleHeight);

  int recompute_height = 0;
  if (splice->height_ < max_height) {
    // Either splice has never been used or max_height has grown since
    // last use.  We could potentially fix it in the latter case, but
    // that is tricky.
    //
    // 要么这个 splice 从未被使用过,要么自上次使用以来 max_height 已经增大。
    // 后者理论上可以修复,但实现起来比较棘手。
    splice->prev_[max_height] = head_;
    splice->next_[max_height] = nullptr;
    splice->height_ = max_height;
    recompute_height = max_height;
  } else {
    // Splice is a valid proper-height splice that brackets some
    // key, but does it bracket this one?  We need to validate it and
    // recompute a portion of the splice (levels 0..recompute_height-1)
    // that is a superset of all levels that don't bracket the new key.
    // Several choices are reasonable, because we have to balance the work
    // saved against the extra comparisons required to validate the Splice.
    //
    // One strategy is just to recompute all of orig_splice_height if the
    // bottom level isn't bracketing.  This pessimistically assumes that
    // we will either get a perfect Splice hit (increasing sequential
    // inserts) or have no locality.
    //
    // Another strategy is to walk up the Splice's levels until we find
    // a level that brackets the key.  This strategy lets the Splice
    // hint help for other cases: it turns insertion from O(log N) into
    // O(log D), where D is the number of nodes in between the key that
    // produced the Splice and the current insert (insertion is aided
    // whether the new key is before or after the splice).  If you have
    // a way of using a prefix of the key to map directly to the closest
    // Splice out of O(sqrt(N)) Splices and we make it so that splices
    // can also be used as hints during read, then we end up with Oshman's
    // and Shavit's SkipTrie, which has O(log log N) lookup and insertion
    // (compare to O(log N) for skip list).
    //
    // We control the pessimistic strategy with allow_partial_splice_fix.
    // A good strategy is probably to be pessimistic for seq_splice_,
    // optimistic if the caller actually went to the work of providing
    // a Splice.
    //
    // Splice 用来括住一个 key 时,只可能变得宽松,不可能变得紧迫。因为 skiplist
    // 不支持删除操作。 Splice 是一个有效且高度合适的 splice,它能括住某个 key,
    // 但它能括住当前这个 key 吗?我们需要对其进行校验,
    // 并重新计算 splice 的一部分(level 0 到 recompute_height-1),
    // 这部分应覆盖所有不能括住新 key 的层级。
    // 有几种合理的选择,因为我们需要在节省的工作量与
    // 为校验 Splice 额外进行的比较之间取得平衡。
    //
    // 一种策略是,如果最底层不能括住,就重新计算所有 orig_splice_height。
    // 这是一种悲观的假设,认为要么我们会得到完美的 Splice 命中(递增的
    // 顺序插入),要么就完全没有局部性。
    //
    // 另一种策略是沿着 Splice 的层级向上走,直到找到一个能括住该 key 的层级。
    // 这种策略让 Splice 提示在其他情况下也有帮助:它将插入的复杂度
    // 从 O(log N) 降低到 O(log D),其中 D 是产生 Splice 的 key 与当前插入位置
    // 之间的节点数(无论是新 key 在 splice 之前还是之后,插入都会得到帮助)。
    // 如果你有一种方法,可以通过 key 的前缀直接映射到 O(sqrt(N)) 个 Splices
    // 中最接近的一个,并且让 splices 也能在读操作时作为提示使用,
    // 那么我们就得到了 Oshman 和 Shavit 的 SkipTrie,它具有 O(log log N) 的
    // 查找和插入复杂度(相比之下,跳表是 O(log N))。
    //
    // 我们通过 allow_partial_splice_fix 来控制这种悲观策略。
    // 一个好的策略可能是:对于 seq_splice_ 使用悲观策略,
    // 如果调用者确实费了力气提供了一个 Splice,则使用乐观策略。
    while (recompute_height < max_height) {
      if (splice->prev_[recompute_height]->Next(recompute_height) !=
          splice->next_[recompute_height]) {
        ++recompute_height;
      } else if (splice->prev_[recompute_height] != head_ &&
                 !KeyIsAfterNode(key_decoded,
                                 splice->prev_[recompute_height])) {
        if (allow_partial_splice_fix) {
          Node* bad = splice->prev_[recompute_height];
          while (splice->prev_[recompute_height] == bad) {
            ++recompute_height;
          }
        } else {
          recompute_height = max_height;
        }
      } else if (KeyIsAfterNode(key_decoded, splice->next_[recompute_height])) {
        if (allow_partial_splice_fix) {
          Node* bad = splice->next_[recompute_height];
          while (splice->next_[recompute_height] == bad) {
            ++recompute_height;
          }
        } else {
          recompute_height = max_height;
        }
      } else {
        break;
      }
    }
  }
  assert(recompute_height <= max_height);
  if (recompute_height > 0) {
    RecomputeSpliceLevels(key_decoded, splice, recompute_height);
  }

  bool splice_is_valid = true;
  if (UseCAS) {
    for (int i = 0; i < height; ++i) {
      while (true) {
        // Checking for duplicate keys on the level 0 is sufficient
        if (UNLIKELY(i == 0 && splice->next_[i] != nullptr &&
                     compare_(splice->next_[i]->Key(), key_decoded) <= 0)) {
          return false;
        }
        if (UNLIKELY(i == 0 && splice->prev_[i] != head_ &&
                     compare_(splice->prev_[i]->Key(), key_decoded) >= 0)) {
          return false;
        }
        assert(splice->next_[i] == nullptr ||
               compare_(x->Key(), splice->next_[i]->Key()) < 0);
        assert(splice->prev_[i] == head_ ||
               compare_(splice->prev_[i]->Key(), x->Key()) < 0);
        x->NoBarrier_SetNext(i, splice->next_[i]);
        if (splice->prev_[i]->CASNext(i, splice->next_[i], x)) {
          break;
        }
        // CAS 失败了,我们需要重新计算 prev 和 next。
        // 重新搜索时尝试使用不同的层级大概率没有帮助,因为在 prev[i] 和 next[i]
        // 之间插入了大量节点的情况不太可能发生。也没有必要把 next[i]
        // 当作“之后”的提示,因为我们知道它已经过期了。
        FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i,
                                  &splice->prev_[i], &splice->next_[i]);
        if (i > 0) {
          splice_is_valid = false;
        }
      }
    }
  } else {
    for (int i = 0; i < height; ++i) {
      if (i >= recompute_height &&
          splice->prev_[i]->Next(i) != splice->next_[i]) {
        FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i,
                                  &splice->prev_[i], &splice->next_[i]);
      }
      // Checking for duplicate keys on the level 0 is sufficient
      if (UNLIKELY(i == 0 && splice->next_[i] != nullptr &&
                   compare_(splice->next_[i]->Key(), key_decoded) <= 0)) {
        return false;
      }
      if (UNLIKELY(i == 0 && splice->prev_[i] != head_ &&
                   compare_(splice->prev_[i]->Key(), key_decoded) >= 0)) {
        return false;
      }
      assert(splice->next_[i] == nullptr ||
             compare_(x->Key(), splice->next_[i]->Key()) < 0);
      assert(splice->prev_[i] == head_ ||
             compare_(splice->prev_[i]->Key(), x->Key()) < 0);
      assert(splice->prev_[i]->Next(i) == splice->next_[i]);
      x->NoBarrier_SetNext(i, splice->next_[i]);
      splice->prev_[i]->SetNext(i, x);
    }
  }
  if (splice_is_valid) {
    for (int i = 0; i < height; ++i) {
      splice->prev_[i] = x;
    }
    assert(splice->prev_[splice->height_] == head_);
    assert(splice->next_[splice->height_] == nullptr);
    for (int i = 0; i < splice->height_; ++i) {
      assert(splice->next_[i] == nullptr ||
             compare_(key, splice->next_[i]->Key()) < 0);
      assert(splice->prev_[i] == head_ ||
             compare_(splice->prev_[i]->Key(), key) <= 0);
      assert(splice->prev_[i + 1] == splice->prev_[i] ||
             splice->prev_[i + 1] == head_ ||
             compare_(splice->prev_[i + 1]->Key(), splice->prev_[i]->Key()) <
                 0);
      assert(splice->next_[i + 1] == splice->next_[i] ||
             splice->next_[i + 1] == nullptr ||
             compare_(splice->next_[i]->Key(), splice->next_[i + 1]->Key()) <
                 0);
    }
  } else {
    splice->height_ = 0;
  }
  return true;
}

insert 这个函数大概分为两个部分,前半部分的 splice 校验及修正,后半部分的正式插入。

先看前半部分。

int max_height = max_height_.LoadRelaxed(); while (height > max_height) {   // compare-and-swap(expected=max_height, desired=height)。   // 失败时会把传入的 max_height 更新为当前的原子值,因此下一轮用的是最新的   // max_height。   if (max_height_.CasWeakRelaxed(max_height, height)) {     // successfully updated it     max_height = height;     break;   }   // else retry, possibly exiting the loop because somebody else   // increased it } assert(max_height <= kMaxPossibleHeight);

如果需要,则先对 max_height_ 进行修改,这是第一步。

接下来是比较难理解的是 splice 矫正,这分为两个部分,一个是检查并得到从第几层开始修正,另一个则是正式的矫正。

int recompute_height = 0; if (splice->height_ < max_height) {   // Either splice has never been used or max_height has grown since   // last use.  We could potentially fix it in the latter case, but   // that is tricky.   //   // 要么这个 splice 从未被使用过,要么自上次使用以来 max_height 已经增大。   // 后者理论上可以修复,但实现起来比较棘手。   splice->prev_[max_height] = head_;   splice->next_[max_height] = nullptr;   splice->height_ = max_height;   recompute_height = max_height; }

这里处理的是 splice 的 height 较低的情况,因为这个情况较难处理,所以直接放弃进行优化,直接在下面进行矫正时,从最顶层进行。

else {     // Splice is a valid proper-height splice that brackets some     // key, but does it bracket this one?  We need to validate it and     // recompute a portion of the splice (levels 0..recompute_height-1)     // that is a superset of all levels that don't bracket the new key.     // Several choices are reasonable, because we have to balance the work     // saved against the extra comparisons required to validate the Splice.     //     // One strategy is just to recompute all of orig_splice_height if the     // bottom level isn't bracketing.  This pessimistically assumes that     // we will either get a perfect Splice hit (increasing sequential     // inserts) or have no locality.     //     // Another strategy is to walk up the Splice's levels until we find     // a level that brackets the key.  This strategy lets the Splice     // hint help for other cases: it turns insertion from O(log N) into     // O(log D), where D is the number of nodes in between the key that     // produced the Splice and the current insert (insertion is aided     // whether the new key is before or after the splice).  If you have     // a way of using a prefix of the key to map directly to the closest     // Splice out of O(sqrt(N)) Splices and we make it so that splices     // can also be used as hints during read, then we end up with Oshman's     // and Shavit's SkipTrie, which has O(log log N) lookup and insertion     // (compare to O(log N) for skip list).     //     // We control the pessimistic strategy with allow_partial_splice_fix.     // A good strategy is probably to be pessimistic for seq_splice_,     // optimistic if the caller actually went to the work of providing     // a Splice.     //     // Splice 用来括住一个 key 时,只可能变得宽松,不可能变得紧迫。因为 skiplist     // 不支持删除操作。 Splice 是一个有效且高度合适的 splice,它能括住某个 key,     // 但它能括住当前这个 key 吗?我们需要对其进行校验,     // 并重新计算 splice 的一部分(level 0 到 recompute_height-1),     // 这部分应覆盖所有不能括住新 key 的层级。     // 有几种合理的选择,因为我们需要在节省的工作量与     // 为校验 Splice 额外进行的比较之间做权衡。     //     // 一种策略是在底层(level 0)不能括住时,直接重新计算     // 原 splice 的全部高度。这种做法是悲观的,假设我们要么能     // 获得一次完美的 Splice 命中(增强顺序插入),要么没有局部性。     //     // 另一种策略是沿着 Splice 的层级向上走,直到找到一个     // 能括住该 key 的层级。该策略使 Splice 的提示在其他情况下也有帮助:     // 它将插入从 O(log N) 变为 O(log D),其中 D 是由产生该 Splice 的 key     // 到当前插入之间的节点数量(无论新 key 在 splice 之前还是之后,     // 插入都能得到帮助)。如果你能用 key 的前缀把它直接映射到     // O(sqrt(N)) 个 Splice 中最近的那个,并且我们让 splice 在读路径中     // 也能作为提示使用,那么我们就得到了 Oshman 和 Shavit 的 SkipTrie,     // 其查找与插入为 O(log log N)(相比跳表为 O(log N))。     //     // 我们用 allow_partial_splice_fix 来控制这种悲观策略。     // 一个可能较好的策略是:对 seq_splice_ 采取悲观策略;     // 如果调用方确实提供了一个 Splice,则采取乐观策略。     while (recompute_height < max_height) {       if (splice->prev_[recompute_height]->Next(recompute_height) !=           splice->next_[recompute_height]) {         // splice isn't tight at this level, there must have been some inserts         // to this         // location that didn't update the splice.  We might only be a little         // stale, but if         // the splice is very stale it would be O(N) to fix it.  We haven't used         // up any of         // our budget of comparisons, so always move up even if we are         // pessimistic about         // our chances of success.         //         // 因为我们每次进行操作可以认为有一个成本上限,为了减少这个成本,不能在较低层直接进行紧缩操作。         // 在这一层 splice 并不紧密,说明在这个位置发生过一些插入但没有更新         // splice。 我们可能只是一点点过期,但如果 splice         // 非常过期,修复它的代价将是 O(N)。         // 我们还没有用掉比较操作的预算,所以即使对成功几率持悲观态度,也要总是向上提升一层继续检查。         // 因为向上检查可以减少时间复杂度,         ++recompute_height;       } else if (splice->prev_[recompute_height] != head_ &&                  !KeyIsAfterNode(key_decoded,                                  splice->prev_[recompute_height])) {         // key is from before splice         if (allow_partial_splice_fix) {           // skip all levels with the same node without more comparisons           Node* bad = splice->prev_[recompute_height];           while (splice->prev_[recompute_height] == bad) {             ++recompute_height;           }         } else {           // we're pessimistic, recompute everything           recompute_height = max_height;         }       } else if (KeyIsAfterNode(key_decoded, splice->next_[recompute_height])) {         // key is from after splice         if (allow_partial_splice_fix) {           Node* bad = splice->next_[recompute_height];           while (splice->next_[recompute_height] == bad) {             ++recompute_height;           }         } else {           recompute_height = max_height;         }       } else {         // this level brackets the key, we won!         break;       }     }   }

这里进行的操作是得到需要修正的所有层级,如果某一层完全套住了 key,那么就可以终止循环,因为 splice 遵从约束 prev[i+1].key <= prev[i].key < next[i].key <=next[i+1].key。

循环中有一组 if else。

第一个 if (splice->prev_[recompute_height]->Next(recompute_height) !=splice->next_[recompute_height]) 表示 splice 中间插入了其他值,可能是 splice 过期了,所以需要进行修复,直接 ++recompute_height。

第二个 else if (splice->prev_[recompute_height] != head_ &&!KeyIsAfterNode(key_decoded,splice->prev_[recompute_height])) 表示 key 在 splice 左边,需要修复。

第三个与第二个逻辑一致,表示 key 在 splice 右边,需要修复。

第四个则表示 key 完全落入 splice 中间,则可以推出循环,不需要继续修复。

assert(recompute_height <= max_height); if (recompute_height > 0) {   RecomputeSpliceLevels(key_decoded, splice, recompute_height); }

通过调用 RecomputeSpliceLevels 即可对 splice 进行修复,其逻辑与上面的描述一致。

其实这里修复是要求 splice 的 recompute_height 层以及之下都能紧迫的套住 key。

而插入在这之后发生。

bool splice_is_valid = true;   if (UseCAS) {     for (int i = 0; i < height; ++i) {       while (true) {         // Checking for duplicate keys on the level 0 is sufficient         if (UNLIKELY(i == 0 && splice->next_[i] != nullptr &&                      compare_(splice->next_[i]->Key(), key_decoded) <= 0)) {           // duplicate key           return false;         }         if (UNLIKELY(i == 0 && splice->prev_[i] != head_ &&                      compare_(splice->prev_[i]->Key(), key_decoded) >= 0)) {           // duplicate key           return false;         }         assert(splice->next_[i] == nullptr ||                compare_(x->Key(), splice->next_[i]->Key()) < 0);         assert(splice->prev_[i] == head_ ||                compare_(splice->prev_[i]->Key(), x->Key()) < 0);         x->NoBarrier_SetNext(i, splice->next_[i]);         if (splice->prev_[i]->CASNext(i, splice->next_[i], x)) {           // success           break;         }         // CAS failed, we need to recompute prev and next. It is unlikely         // to be helpful to try to use a different level as we redo the         // search, because it should be unlikely that lots of nodes have         // been inserted between prev[i] and next[i]. No point in using         // next[i] as the after hint, because we know it is stale.         //         // CAS 失败了,我们需要重新计算 prev 和 next。         // 重新搜索时尝试使用不同的层级大概率没有帮助,因为在 prev[i] 和 next[i]         // 之间插入了大量节点的情况不太可能发生。 也没有必要把 next[i]         // 当作“之后”的提示,因为我们知道它已经过期了。         FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i,                                   &splice->prev_[i], &splice->next_[i]);          // Since we've narrowed the bracket for level i, we might have         // violated the Splice constraint between i and i-1.  Make sure         // we recompute the whole thing next time.         // 由于我们在第 i 层把括住区间收紧了,可能打破了 i 与 i-1 层之间的         // Splice 约束。 确保下次把整个东西重新计算一遍。         // Splice 约束是指:prev_[i+1] <= prev_[i] < next_[i] <= next_[i+1].         if (i > 0) {           splice_is_valid = false;         }       }     }   }

先看一下支持并发插入的部分,这个部分假设 splice 是紧迫地套住 key 左右的,不过仍然会进行检查。

其实不难理解,指针修改从下到上可以尽快发现是否存在重复键,因为重复键需要返回失败,而已经修改的指针没办法安全的还原,所以必须从下到上修改。

于是第一步就是进行 key 的检查,是否存在重复键。

虽然先对插入的节点修改指针,最后使用 CAS 原子修改 prev 节点的指针指向。

如果 CAS 失败,说明可能存在其他线程并发的进行了插入,所以需要重新调用 FindSpliceForLevel 对 splice 进行重新紧迫,并进行下一次尝试。

splice_is_valid 用来指示是否 splice 是否在插入时进行了修改,因为如果插入时进行修改,可能会导致 splice 的约束遭到了破坏。

非并发插入部分的代码和并发插入部分的逻辑基本一致。

可以看到 splice 在插入过程中的作用,通过锁定范围,可以用来检查并发的冲突,非常的巧妙。

if (splice_is_valid) {   for (int i = 0; i < height; ++i) {     splice->prev_[i] = x;   }   assert(splice->prev_[splice->height_] == head_);   assert(splice->next_[splice->height_] == nullptr);   for (int i = 0; i < splice->height_; ++i) {     assert(splice->next_[i] == nullptr ||            compare_(key, splice->next_[i]->Key()) < 0);     assert(splice->prev_[i] == head_ ||            compare_(splice->prev_[i]->Key(), key) <= 0);     assert(splice->prev_[i + 1] == splice->prev_[i] ||            splice->prev_[i + 1] == head_ ||            compare_(splice->prev_[i + 1]->Key(), splice->prev_[i]->Key()) <                0);     assert(splice->next_[i + 1] == splice->next_[i] ||            splice->next_[i + 1] == nullptr ||            compare_(splice->next_[i]->Key(), splice->next_[i + 1]->Key()) <                0);   } } else {   splice->height_ = 0; }

最后是收尾工作,对 splice 进行检查和保存当此结果。

如果 splice_is_valid == true,先修改 splice 的 prev,修改为指向插入的节点,保存当次结果,最后是进行一些检查,如果符合升序和非空。

如果 splice_is_valid == false,修改 splice 的 height 为 0 相当于标记了 splice 为一个无用值,在插入前会对 splice 进行检查 if (splice->height_ < max_height) 也有这个原因。

memtable 的方法参数太多了,看得有点头大,等看完了再做个总结。