diff --git a/db/inlineskiplist.h b/db/inlineskiplist.h index 5e4ffbc7a..c31aa4aa5 100644 --- a/db/inlineskiplist.h +++ b/db/inlineskiplist.h @@ -44,6 +44,7 @@ #pragma once #include #include +#include #include #include "port/port.h" #include "util/allocator.h" @@ -53,6 +54,9 @@ namespace rocksdb { template class InlineSkipList { + public: + struct InsertHint; + private: struct Node; @@ -77,6 +81,19 @@ class InlineSkipList { // REQUIRES: no concurrent calls to INSERT void Insert(const char* key); + // Inserts a key allocated by AllocateKey with a hint. It can be used to + // optimize sequential inserts, or inserting a key close to the largest + // key inserted previously with the same hint. + // + // If hint points to nullptr, a new hint will be populated, which can be + // used in subsequent calls. + // + // REQUIRES: All keys inserted with the same hint must be consecutive in the + // skip-list, i.e. let [k1..k2] be the range of keys inserted with hint h, + // there shouldn't be a key k in the skip-list with k1 < k < k2, unless k is + // also inserted with the same hint. + void InsertWithHint(const char* key, InsertHint** hint); + // Like Insert, but external synchronization is not required. void InsertConcurrently(const char* key); @@ -86,6 +103,9 @@ class InlineSkipList { // Return estimated number of entries smaller than `key`. uint64_t EstimateCount(const char* key) const; + // Validate correctness of the skip-list. + void TEST_Validate() const; + // Iteration over the contents of a skip list class Iterator { public: @@ -134,7 +154,7 @@ class InlineSkipList { }; private: - enum MaxPossibleHeightEnum : uint16_t { kMaxPossibleHeight = 32 }; + static const uint16_t kMaxPossibleHeight = 32; const uint16_t kMaxHeight_; const uint16_t kBranching_; @@ -156,7 +176,7 @@ class InlineSkipList { // prev_height_ is the height of prev_[0]. prev_[0] can only be equal // to head when max_height_ and prev_height_ are both 1. Node** prev_; - std::atomic prev_height_; + std::atomic prev_height_; inline int GetMaxHeight() const { return max_height_.load(std::memory_order_relaxed); @@ -166,6 +186,13 @@ class InlineSkipList { Node* AllocateNode(size_t key_size, int height); + // Allocate a hint used by InsertWithHint(). + InsertHint* AllocateInsertHint(); + + // Extract the node from a key allocated by AllocateKey(), and populate + // height of the node. + Node* GetNodeForInsert(const char* key, int* height); + bool Equal(const char* a, const char* b) const { return (compare_(a, b) == 0); } @@ -188,6 +215,13 @@ class InlineSkipList { // level in [0..max_height_-1], if prev is non-null. Node* FindLessThan(const char* key, Node** prev = nullptr) const; + // Return the latest node with a key < key on bottom_level. Start searching + // from root node on the level below top_level. + // Fills prev[level] with pointer to previous node at "level" for every + // level in [bottom_level..top_level-1], if prev is non-null. + Node* FindLessThan(const char* key, Node** prev, Node* root, int top_level, + int bottom_level) const; + // Return the last node in the list. // Return head_ if list is empty. Node* FindLast() const; @@ -201,6 +235,10 @@ class InlineSkipList { void FindLevelSplice(const char* key, Node* before, Node* after, int level, Node** out_prev, Node** out_next); + // Check if we need to invalidate prev_ cache after inserting a node of + // given height. + void MaybeInvalidatePrev(int height); + // No copying allowed InlineSkipList(const InlineSkipList&); InlineSkipList& operator=(const InlineSkipList&); @@ -265,12 +303,31 @@ struct InlineSkipList::Node { next_[-n].store(x, std::memory_order_relaxed); } + // Insert node after prev on specific level. + void InsertAfter(Node* prev, int level) { + // NoBarrier_SetNext() suffices since we will add a barrier when + // we publish a pointer to "this" in prev. + NoBarrier_SetNext(level, prev->NoBarrier_Next(level)); + prev->SetNext(level, this); + } + private: // next_[0] is the lowest level link (level 0). Higher levels are // stored _earlier_, so level 1 is at next_[-1]. std::atomic next_[1]; }; +// +// +// Hint to insert position to speed-up inserts. See implementation of +// InsertWithHint() for more details. +template +struct InlineSkipList::InsertHint { + Node** prev; + uint8_t* prev_height; + int num_levels; +}; + template inline InlineSkipList::Iterator::Iterator( const InlineSkipList* list) { @@ -401,8 +458,17 @@ InlineSkipList::FindGreaterOrEqual(const char* key) const { template typename InlineSkipList::Node* InlineSkipList::FindLessThan(const char* key, Node** prev) const { - Node* x = head_; - int level = GetMaxHeight() - 1; + return FindLessThan(key, prev, head_, GetMaxHeight(), 0); +} + +template +typename InlineSkipList::Node* +InlineSkipList::FindLessThan(const char* key, Node** prev, + Node* root, int top_level, + int bottom_level) const { + assert(top_level > bottom_level); + int level = top_level - 1; + Node* x = root; // KeyIsAfter(key, last_not_after) is definitely false Node* last_not_after = nullptr; while (true) { @@ -416,10 +482,10 @@ InlineSkipList::FindLessThan(const char* key, Node** prev) const { if (prev != nullptr) { prev[level] = x; } - if (level == 0) { + if (level == bottom_level) { return x; } else { - // Switch to next list, reuse KeyIUsAfterNode() result + // Switch to next list, reuse KeyIsAfterNode() result last_not_after = next; level--; } @@ -528,6 +594,63 @@ InlineSkipList::AllocateNode(size_t key_size, int height) { return x; } +template +typename InlineSkipList::InsertHint* +InlineSkipList::AllocateInsertHint() { + InsertHint* hint = reinterpret_cast( + allocator_->AllocateAligned(sizeof(InsertHint))); + // Allocate an extra level on kMaxHeight_, to make boundary cases easier to + // handle. + hint->prev = reinterpret_cast( + allocator_->AllocateAligned(sizeof(Node*) * (kMaxHeight_ + 1))); + hint->prev_height = reinterpret_cast( + allocator_->AllocateAligned(sizeof(uint8_t*) * kMaxHeight_)); + for (int i = 0; i <= kMaxHeight_; i++) { + hint->prev[i] = head_; + } + hint->num_levels = 0; + return hint; +} + +template +typename InlineSkipList::Node* +InlineSkipList::GetNodeForInsert(const char* key, int* height) { + // Find the Node that we placed before the key in AllocateKey + Node* x = reinterpret_cast(const_cast(key)) - 1; + assert(height != nullptr); + *height = x->UnstashHeight(); + assert(*height >= 1 && *height <= kMaxHeight_); + + if (*height > GetMaxHeight()) { + // It is ok to mutate max_height_ without any synchronization + // with concurrent readers. A concurrent reader that observes + // the new value of max_height_ will see either the old value of + // new level pointers from head_ (nullptr), or a new value set in + // the loop below. In the former case the reader will + // immediately drop to the next level since nullptr sorts after all + // keys. In the latter case the reader will use the new node. + max_height_.store(*height, std::memory_order_relaxed); + } + + return x; +} + +template +void InlineSkipList::MaybeInvalidatePrev(int height) { + // We don't have a lock-free algorithm for updating prev_, but we do have + // the option of invalidating the entire sequential-insertion cache. + // prev_'s invariant is that prev_[i] (i > 0) is the predecessor of + // prev_[0] at that level. We're only going to violate that if height + // > 1 and key lands after prev_[height - 1] but before prev_[0]. + // Comparisons are pretty expensive, so an easier version is to just + // clear the cache if height > 1. We only write to prev_height_ if the + // nobody else has, to avoid invalidating the root of the skip list in + // all of the other CPU caches. + if (height > 1 && prev_height_.load(std::memory_order_relaxed) != 0) { + prev_height_.store(0, std::memory_order_relaxed); + } +} + template void InlineSkipList::Insert(const char* key) { // InsertConcurrently often can't maintain the prev_ invariants, so @@ -558,36 +681,135 @@ void InlineSkipList::Insert(const char* key) { // Our data structure does not allow duplicate insertion assert(prev_[0]->Next(0) == nullptr || !Equal(key, prev_[0]->Next(0)->Key())); - // Find the Node that we placed before the key in AllocateKey - Node* x = reinterpret_cast(const_cast(key)) - 1; - int height = x->UnstashHeight(); - assert(height >= 1 && height <= kMaxHeight_); - - if (height > GetMaxHeight()) { - for (int i = GetMaxHeight(); i < height; i++) { - prev_[i] = head_; - } - - // It is ok to mutate max_height_ without any synchronization - // with concurrent readers. A concurrent reader that observes - // the new value of max_height_ will see either the old value of - // new level pointers from head_ (nullptr), or a new value set in - // the loop below. In the former case the reader will - // immediately drop to the next level since nullptr sorts after all - // keys. In the latter case the reader will use the new node. - max_height_.store(height, std::memory_order_relaxed); - } + int height = 0; + Node* x = GetNodeForInsert(key, &height); for (int i = 0; i < height; i++) { - // NoBarrier_SetNext() suffices since we will add a barrier when - // we publish a pointer to "x" in prev[i]. - x->NoBarrier_SetNext(i, prev_[i]->NoBarrier_Next(i)); - prev_[i]->SetNext(i, x); + x->InsertAfter(prev_[i], i); } prev_[0] = x; prev_height_.store(height, std::memory_order_relaxed); } +// The goal here is to reduce the number of key comparisons, as it can be +// expensive. We maintain a hint which help us to find a insert position +// between or next to previously inserted keys with the same hint. +// Note that we require all keys inserted with the same hint are consecutive +// in the skip-list. +// +// The hint keeps a list of nodes previous inserted with the same hint: +// * The first level, prev[0], points to the largest key of them. +// * For 0 < i < num_levels, prev[i] is the previous node of prev[i-1] +// on level i, i.e. +// prev[i] < prev[i-1] <= prev[i]->Next(i) +// (prev[i-1] and prev[i]->Next(i) could be the same node.) +// In addition prev_height keeps the height of prev[i]. +// +// When inserting a new key, we look for the lowest level L where +// prev[L] < key < prev[L-1]. Let +// M = max(prev_height[i]..prev_height[num_levels-1]) +// For each level between in [L, M), the previous node of +// the new key must be one of prev[i]. For levels below L and above M +// we do normal skip-list search if needed. +// +// The optimization is suitable for stream of keys where new inserts are next +// to or close to the largest key ever inserted, e.g. sequential inserts. +template +void InlineSkipList::InsertWithHint(const char* key, + InsertHint** hint_ptr) { + int height = 0; + Node* x = GetNodeForInsert(key, &height); + + // InsertWithHint() is not compatible with prev_ optimization used by + // Insert(). + MaybeInvalidatePrev(height); + + assert(hint_ptr != nullptr); + InsertHint* hint = *hint_ptr; + if (hint == nullptr) { + // AllocateInsertHint will initialize hint with num_levels = 0 and + // prev[i] = head_ for all i. + hint = AllocateInsertHint(); + *hint_ptr = hint; + } + + // Look for the first level i < num_levels with prev[i] < key. + int level = 0; + for (; level < hint->num_levels; level++) { + if (KeyIsAfterNode(key, hint->prev[level])) { + assert(!KeyIsAfterNode(key, hint->prev[level]->Next(level))); + break; + } + } + Node* tmp_prev[kMaxPossibleHeight]; + if (level >= hint->num_levels) { + // The hint is not useful in this case. Fallback to full search. + FindLessThan(key, tmp_prev); + for (int i = 0; i < height; i++) { + assert(tmp_prev[i] == head_ || KeyIsAfterNode(key, tmp_prev[i])); + assert(!KeyIsAfterNode(key, tmp_prev[i]->Next(i))); + x->InsertAfter(tmp_prev[i], i); + } + } else { + // Search on levels below "level", using prev[level] as root. + if (level > 0) { + FindLessThan(key, tmp_prev, hint->prev[level], level, 0); + for (int i = 0; i < level && i < height; i++) { + assert(tmp_prev[i] == head_ || KeyIsAfterNode(key, tmp_prev[i])); + assert(!KeyIsAfterNode(key, tmp_prev[i]->Next(i))); + x->InsertAfter(tmp_prev[i], i); + } + } + // The current level where the new node is to insert into skip-list. + int current_level = level; + for (int i = level; i < hint->num_levels; i++) { + while (current_level < height && current_level < hint->prev_height[i]) { + // In this case, prev[i] is the previous node of key on current_level, + // since: + // * prev[i] < key; + // * no other nodes less than prev[level-1] has height greater than + // current_level, and prev[level-1] > key. + assert(KeyIsAfterNode(key, hint->prev[i])); + assert(!KeyIsAfterNode(key, hint->prev[i]->Next(current_level))); + x->InsertAfter(hint->prev[i], current_level); + current_level++; + } + } + // Full search on levels above current_level if needed. + if (current_level < height) { + FindLessThan(key, tmp_prev, head_, GetMaxHeight(), current_level); + for (int i = current_level; i < height; i++) { + assert(tmp_prev[i] == head_ || KeyIsAfterNode(key, tmp_prev[i])); + assert(!KeyIsAfterNode(key, tmp_prev[i]->Next(i))); + x->InsertAfter(tmp_prev[i], i); + } + } + } + // The last step is update the new node into the hint. + // * If "height" <= "level", prev[level] is still the previous node of + // prev[level-1] on level "level". Stop. + // * Otherwise, the new node becomes the new previous node of + // prev[level-1], or if level=0, the new node becomes the largest node + // inserted with the same hint. Replace prev[level] with the new node. + // * If prev[i] is replaced by another node, check if it can replace + // prev[i+1] using a similar rule, up till "num_levels" level. + Node* p = x; + uint8_t h = static_cast(height); + for (int i = level; i < hint->num_levels; i++) { + if (h <= i) { + p = nullptr; + break; + } + std::swap(p, hint->prev[i]); + std::swap(h, hint->prev_height[i]); + } + if (p != nullptr && h > hint->num_levels) { + hint->prev[hint->num_levels] = p; + hint->prev_height[hint->num_levels] = h; + hint->num_levels++; + } +} + template void InlineSkipList::FindLevelSplice(const char* key, Node* before, Node* after, int level, @@ -613,19 +835,7 @@ void InlineSkipList::InsertConcurrently(const char* key) { Node* x = reinterpret_cast(const_cast(key)) - 1; int height = x->UnstashHeight(); assert(height >= 1 && height <= kMaxHeight_); - - // We don't have a lock-free algorithm for updating prev_, but we do have - // the option of invalidating the entire sequential-insertion cache. - // prev_'s invariant is that prev_[i] (i > 0) is the predecessor of - // prev_[0] at that level. We're only going to violate that if height - // > 1 and key lands after prev_[height - 1] but before prev_[0]. - // Comparisons are pretty expensive, so an easier version is to just - // clear the cache if height > 1. We only write to prev_height_ if the - // nobody else has, to avoid invalidating the root of the skip list in - // all of the other CPU caches. - if (height > 1 && prev_height_.load(std::memory_order_relaxed) != 0) { - prev_height_.store(0, std::memory_order_relaxed); - } + MaybeInvalidatePrev(height); int max_height = max_height_.load(std::memory_order_relaxed); while (height > max_height) { @@ -673,4 +883,44 @@ bool InlineSkipList::Contains(const char* key) const { } } +template +void InlineSkipList::TEST_Validate() const { + // Interate over all levels at the same time, and verify nodes appear in + // the right order, and nodes appear in upper level also appear in lower + // levels. + Node* nodes[kMaxPossibleHeight]; + int max_height = GetMaxHeight(); + for (int i = 0; i < max_height; i++) { + nodes[i] = head_; + } + while (nodes[0] != nullptr) { + Node* l0_next = nodes[0]->Next(0); + if (l0_next == nullptr) { + break; + } + assert(nodes[0] == head_ || compare_(nodes[0]->Key(), l0_next->Key()) < 0); + nodes[0] = l0_next; + + int i = 1; + while (i < max_height) { + Node* next = nodes[i]->Next(i); + if (next == nullptr) { + break; + } + auto cmp = compare_(nodes[0]->Key(), next->Key()); + assert(cmp <= 0); + if (cmp == 0) { + assert(next == nodes[0]); + nodes[i] = next; + } else { + break; + } + i++; + } + } + for (int i = 1; i < max_height; i++) { + assert(nodes[i]->Next(i) == nullptr); + } +} + } // namespace rocksdb diff --git a/db/inlineskiplist_test.cc b/db/inlineskiplist_test.cc index a683b332b..658ca4364 100644 --- a/db/inlineskiplist_test.cc +++ b/db/inlineskiplist_test.cc @@ -9,6 +9,7 @@ #include "db/inlineskiplist.h" #include +#include #include "rocksdb/env.h" #include "util/concurrent_arena.h" #include "util/hash.h" @@ -42,7 +43,49 @@ struct TestComparator { } }; -class InlineSkipTest : public testing::Test {}; +typedef InlineSkipList TestInlineSkipList; + +class InlineSkipTest : public testing::Test { + public: + void Insert(TestInlineSkipList* list, Key key) { + char* buf = list->AllocateKey(sizeof(Key)); + memcpy(buf, &key, sizeof(Key)); + list->Insert(buf); + keys_.insert(key); + } + + void InsertWithHint(TestInlineSkipList* list, Key key, + TestInlineSkipList::InsertHint** hint) { + char* buf = list->AllocateKey(sizeof(Key)); + memcpy(buf, &key, sizeof(Key)); + list->InsertWithHint(buf, hint); + keys_.insert(key); + } + + void Validate(TestInlineSkipList* list) { + // Check keys exist. + for (Key key : keys_) { + ASSERT_TRUE(list->Contains(Encode(&key))); + } + // Iterate over the list, make sure keys appears in order and no extra + // keys exist. + TestInlineSkipList::Iterator iter(list); + ASSERT_FALSE(iter.Valid()); + Key zero = 0; + iter.Seek(Encode(&zero)); + for (Key key : keys_) { + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(key, Decode(iter.key())); + iter.Next(); + } + ASSERT_FALSE(iter.Valid()); + // Validate the list is well-formed. + list->TEST_Validate(); + } + + private: + std::set keys_; +}; TEST_F(InlineSkipTest, Empty) { Arena arena; @@ -153,6 +196,103 @@ TEST_F(InlineSkipTest, InsertAndLookup) { } } +TEST_F(InlineSkipTest, InsertWithHint_Sequential) { + const int N = 100000; + Arena arena; + TestComparator cmp; + TestInlineSkipList list(cmp, &arena); + TestInlineSkipList::InsertHint* hint = nullptr; + for (int i = 0; i < N; i++) { + Key key = i; + InsertWithHint(&list, key, &hint); + } + Validate(&list); +} + +TEST_F(InlineSkipTest, InsertWithHint_MultipleHints) { + const int N = 100000; + const int S = 100; + Random rnd(534); + Arena arena; + TestComparator cmp; + TestInlineSkipList list(cmp, &arena); + TestInlineSkipList::InsertHint* hints[S]; + Key last_key[S]; + for (int i = 0; i < S; i++) { + hints[i] = nullptr; + last_key[i] = 0; + } + for (int i = 0; i < N; i++) { + Key s = rnd.Uniform(S); + Key key = (s << 32) + (++last_key[s]); + InsertWithHint(&list, key, &hints[s]); + } + Validate(&list); +} + +TEST_F(InlineSkipTest, InsertWithHint_MultipleHintsRandom) { + const int N = 100000; + const int S = 100; + Random rnd(534); + Arena arena; + TestComparator cmp; + TestInlineSkipList list(cmp, &arena); + TestInlineSkipList::InsertHint* hints[S]; + for (int i = 0; i < S; i++) { + hints[i] = nullptr; + } + for (int i = 0; i < N; i++) { + Key s = rnd.Uniform(S); + Key key = (s << 32) + rnd.Next(); + InsertWithHint(&list, key, &hints[s]); + } + Validate(&list); +} + +TEST_F(InlineSkipTest, InsertWithHint_CompatibleWithInsertWithoutHint) { + const int N = 100000; + const int S1 = 100; + const int S2 = 100; + Random rnd(534); + Arena arena; + TestComparator cmp; + TestInlineSkipList list(cmp, &arena); + std::unordered_set used; + Key with_hint[S1]; + Key without_hint[S2]; + TestInlineSkipList::InsertHint* hints[S1]; + for (int i = 0; i < S1; i++) { + hints[i] = nullptr; + while (true) { + Key s = rnd.Next(); + if (used.insert(s).second) { + with_hint[i] = s; + break; + } + } + } + for (int i = 0; i < S2; i++) { + while (true) { + Key s = rnd.Next(); + if (used.insert(s).second) { + without_hint[i] = s; + break; + } + } + } + for (int i = 0; i < N; i++) { + Key s = rnd.Uniform(S1 + S2); + if (s < S1) { + Key key = (with_hint[s] << 32) + rnd.Next(); + InsertWithHint(&list, key, &hints[s]); + } else { + Key key = (without_hint[s - S1] << 32) + rnd.Next(); + Insert(&list, key); + } + } + Validate(&list); +} + // We want to make sure that with a single writer and multiple // concurrent readers (with no synchronization other than when a // reader's iterator is created), the reader always observes all the