From 52017295454db1be731e3dcddde943ad761c73d9 Mon Sep 17 00:00:00 2001 From: Nathan Bronson Date: Tue, 24 Nov 2015 13:29:50 -0800 Subject: [PATCH] InlineSkipList - part 2/3 Summary: This diff is 2/3 in a sequence that introduces a skip list optimized for a key that is a freshly-allocated const char*. The change is broken into pieces to make it easier to review. This piece removes the Key template type, introduces the AllocateKey interface, and changes the unit test from using uint64_t as the Key type to using pointers to an 8 byte blob. Test Plan: unit test Reviewers: igor, sdong Reviewed By: sdong Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D51285 --- db/inlineskiplist.h | 134 ++++++++++++++++++++------------------ db/inlineskiplist_test.cc | 74 +++++++++++++-------- 2 files changed, 118 insertions(+), 90 deletions(-) diff --git a/db/inlineskiplist.h b/db/inlineskiplist.h index dad87b373..e326640b3 100644 --- a/db/inlineskiplist.h +++ b/db/inlineskiplist.h @@ -38,7 +38,7 @@ namespace rocksdb { -template +template class InlineSkipList { private: struct Node; @@ -52,15 +52,18 @@ class InlineSkipList { int32_t max_height = 12, int32_t branching_factor = 4); + // Allocates a key that can be passed to Insert. + char* AllocateKey(size_t key_size); + // Insert key into the list. // REQUIRES: nothing that compares equal to key is currently in the list. - void Insert(const Key& key); + void Insert(const char* key); // Returns true iff an entry that compares equal to key is in the list. - bool Contains(const Key& key) const; + bool Contains(const char* key) const; // Return estimated number of entries smaller than `key`. - uint64_t EstimateCount(const Key& key) const; + uint64_t EstimateCount(const char* key) const; // Iteration over the contents of a skip list class Iterator { @@ -79,7 +82,7 @@ class InlineSkipList { // Returns the key at the current position. // REQUIRES: Valid() - const Key& key() const; + const char* key() const; // Advances to the next position. // REQUIRES: Valid() @@ -90,7 +93,7 @@ class InlineSkipList { void Prev(); // Advance to the first entry with a key >= target - void Seek(const Key& target); + void Seek(const char* target); // Position at the first entry in list. // Final state of iterator is Valid() iff list is not empty. @@ -132,22 +135,25 @@ class InlineSkipList { return max_height_.load(std::memory_order_relaxed); } - Node* NewNode(const Key& key, int height); + Node* NewNode(const char* key, int height); int RandomHeight(); - bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); } + + bool Equal(const char* a, const char* b) const { + return (compare_(a, b) == 0); + } // Return true if key is greater than the data stored in "n" - bool KeyIsAfterNode(const Key& key, Node* n) const; + bool KeyIsAfterNode(const char* key, Node* n) const; // Returns the earliest node with a key >= key. // Return nullptr if there is no such node. - Node* FindGreaterOrEqual(const Key& key) const; + Node* FindGreaterOrEqual(const char* key) const; // Return the latest node with a key < key. // Return head_ if there is no such node. // Fills prev[level] with pointer to previous node at "level" for every // level in [0..max_height_-1], if prev is non-null. - Node* FindLessThan(const Key& key, Node** prev = nullptr) const; + Node* FindLessThan(const char* key, Node** prev = nullptr) const; // Return the last node in the list. // Return head_ if list is empty. @@ -159,11 +165,11 @@ class InlineSkipList { }; // Implementation details follow -template -struct InlineSkipList::Node { - explicit Node(const Key& k) : key(k) {} +template +struct InlineSkipList::Node { + explicit Node(const char* k) : key(k) {} - Key const key; + const char* const key; // Accessors/mutators for links. Wrapped in methods so we can // add the appropriate barriers as necessary. @@ -195,46 +201,46 @@ struct InlineSkipList::Node { std::atomic next_[1]; }; -template -typename InlineSkipList::Node* -InlineSkipList::NewNode(const Key& key, int height) { +template +typename InlineSkipList::Node* InlineSkipList::NewNode( + const char* key, int height) { char* mem = allocator_->AllocateAligned( sizeof(Node) + sizeof(std::atomic) * (height - 1)); return new (mem) Node(key); } -template -inline InlineSkipList::Iterator::Iterator( +template +inline InlineSkipList::Iterator::Iterator( const InlineSkipList* list) { SetList(list); } -template -inline void InlineSkipList::Iterator::SetList( +template +inline void InlineSkipList::Iterator::SetList( const InlineSkipList* list) { list_ = list; node_ = nullptr; } -template -inline bool InlineSkipList::Iterator::Valid() const { +template +inline bool InlineSkipList::Iterator::Valid() const { return node_ != nullptr; } -template -inline const Key& InlineSkipList::Iterator::key() const { +template +inline const char* InlineSkipList::Iterator::key() const { assert(Valid()); return node_->key; } -template -inline void InlineSkipList::Iterator::Next() { +template +inline void InlineSkipList::Iterator::Next() { assert(Valid()); node_ = node_->Next(0); } -template -inline void InlineSkipList::Iterator::Prev() { +template +inline void InlineSkipList::Iterator::Prev() { // Instead of using explicit "prev" links, we just search for the // last node that falls before key. assert(Valid()); @@ -244,26 +250,26 @@ inline void InlineSkipList::Iterator::Prev() { } } -template -inline void InlineSkipList::Iterator::Seek(const Key& target) { +template +inline void InlineSkipList::Iterator::Seek(const char* target) { node_ = list_->FindGreaterOrEqual(target); } -template -inline void InlineSkipList::Iterator::SeekToFirst() { +template +inline void InlineSkipList::Iterator::SeekToFirst() { node_ = list_->head_->Next(0); } -template -inline void InlineSkipList::Iterator::SeekToLast() { +template +inline void InlineSkipList::Iterator::SeekToLast() { node_ = list_->FindLast(); if (node_ == list_->head_) { node_ = nullptr; } } -template -int InlineSkipList::RandomHeight() { +template +int InlineSkipList::RandomHeight() { auto rnd = Random::GetTLSInstance(); // Increase height with probability 1 in kBranching @@ -276,16 +282,16 @@ int InlineSkipList::RandomHeight() { return height; } -template -bool InlineSkipList::KeyIsAfterNode(const Key& key, - Node* n) const { +template +bool InlineSkipList::KeyIsAfterNode(const char* key, + Node* n) const { // nullptr n is considered infinite return (n != nullptr) && (compare_(n->key, key) < 0); } -template -typename InlineSkipList::Node* -InlineSkipList::FindGreaterOrEqual(const Key& key) const { +template +typename InlineSkipList::Node* +InlineSkipList::FindGreaterOrEqual(const char* key) const { // Note: It looks like we could reduce duplication by implementing // this function as FindLessThan(key)->Next(0), but we wouldn't be able // to exit early on equality and the result wouldn't even be correct. @@ -315,10 +321,9 @@ InlineSkipList::FindGreaterOrEqual(const Key& key) const { } } -template -typename InlineSkipList::Node* -InlineSkipList::FindLessThan(const Key& key, - Node** prev) const { +template +typename InlineSkipList::Node* +InlineSkipList::FindLessThan(const char* key, Node** prev) const { Node* x = head_; int level = GetMaxHeight() - 1; // KeyIsAfter(key, last_not_after) is definitely false @@ -345,9 +350,9 @@ InlineSkipList::FindLessThan(const Key& key, } } -template -typename InlineSkipList::Node* -InlineSkipList::FindLast() const { +template +typename InlineSkipList::Node* +InlineSkipList::FindLast() const { Node* x = head_; int level = GetMaxHeight() - 1; while (true) { @@ -365,8 +370,8 @@ InlineSkipList::FindLast() const { } } -template -uint64_t InlineSkipList::EstimateCount(const Key& key) const { +template +uint64_t InlineSkipList::EstimateCount(const char* key) const { uint64_t count = 0; Node* x = head_; @@ -389,11 +394,11 @@ uint64_t InlineSkipList::EstimateCount(const Key& key) const { } } -template -InlineSkipList::InlineSkipList(const Comparator cmp, - Allocator* allocator, - int32_t max_height, - int32_t branching_factor) +template +InlineSkipList::InlineSkipList(const Comparator cmp, + Allocator* allocator, + int32_t max_height, + int32_t branching_factor) : kMaxHeight_(max_height), kBranching_(branching_factor), kScaledInverseBranching_((Random::kMaxNext + 1) / kBranching_), @@ -417,8 +422,13 @@ InlineSkipList::InlineSkipList(const Comparator cmp, } } -template -void InlineSkipList::Insert(const Key& key) { +template +char* InlineSkipList::AllocateKey(size_t key_size) { + return allocator_->Allocate(key_size); +} + +template +void InlineSkipList::Insert(const char* key) { // fast path for sequential insertion if (!KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) && (prev_[0] == head_ || KeyIsAfterNode(key, prev_[0]))) { @@ -469,8 +479,8 @@ void InlineSkipList::Insert(const Key& key) { prev_height_ = height; } -template -bool InlineSkipList::Contains(const Key& key) const { +template +bool InlineSkipList::Contains(const char* key) const { Node* x = FindGreaterOrEqual(key); if (x != nullptr && Equal(key, x->key)) { return true; diff --git a/db/inlineskiplist_test.cc b/db/inlineskiplist_test.cc index 39fa7b4df..70fd97a88 100644 --- a/db/inlineskiplist_test.cc +++ b/db/inlineskiplist_test.cc @@ -17,13 +17,24 @@ namespace rocksdb { +// Our test skip list stores 8-byte unsigned integers typedef uint64_t Key; +static const char* Encode(const uint64_t* key) { + return reinterpret_cast(key); +} + +static Key Decode(const char* key) { + Key rv; + memcpy(&rv, key, sizeof(Key)); + return rv; +} + struct TestComparator { - int operator()(const Key& a, const Key& b) const { - if (a < b) { + int operator()(const char* a, const char* b) const { + if (Decode(a) < Decode(b)) { return -1; - } else if (a > b) { + } else if (Decode(a) > Decode(b)) { return +1; } else { return 0; @@ -36,14 +47,16 @@ class InlineSkipTest : public testing::Test {}; TEST_F(InlineSkipTest, Empty) { Arena arena; TestComparator cmp; - InlineSkipList list(cmp, &arena); - ASSERT_TRUE(!list.Contains(10)); + InlineSkipList list(cmp, &arena); + Key key = 10; + ASSERT_TRUE(!list.Contains(Encode(&key))); - InlineSkipList::Iterator iter(&list); + InlineSkipList::Iterator iter(&list); ASSERT_TRUE(!iter.Valid()); iter.SeekToFirst(); ASSERT_TRUE(!iter.Valid()); - iter.Seek(100); + key = 100; + iter.Seek(Encode(&key)); ASSERT_TRUE(!iter.Valid()); iter.SeekToLast(); ASSERT_TRUE(!iter.Valid()); @@ -56,16 +69,18 @@ TEST_F(InlineSkipTest, InsertAndLookup) { std::set keys; Arena arena; TestComparator cmp; - InlineSkipList list(cmp, &arena); + InlineSkipList list(cmp, &arena); for (int i = 0; i < N; i++) { Key key = rnd.Next() % R; if (keys.insert(key).second) { - list.Insert(key); + char* buf = list.AllocateKey(sizeof(Key)); + memcpy(buf, &key, sizeof(Key)); + list.Insert(buf); } } - for (int i = 0; i < R; i++) { - if (list.Contains(i)) { + for (Key i = 0; i < R; i++) { + if (list.Contains(Encode(&i))) { ASSERT_EQ(keys.count(i), 1U); } else { ASSERT_EQ(keys.count(i), 0U); @@ -74,26 +89,27 @@ TEST_F(InlineSkipTest, InsertAndLookup) { // Simple iterator tests { - InlineSkipList::Iterator iter(&list); + InlineSkipList::Iterator iter(&list); ASSERT_TRUE(!iter.Valid()); - iter.Seek(0); + uint64_t zero = 0; + iter.Seek(Encode(&zero)); ASSERT_TRUE(iter.Valid()); - ASSERT_EQ(*(keys.begin()), iter.key()); + ASSERT_EQ(*(keys.begin()), Decode(iter.key())); iter.SeekToFirst(); ASSERT_TRUE(iter.Valid()); - ASSERT_EQ(*(keys.begin()), iter.key()); + ASSERT_EQ(*(keys.begin()), Decode(iter.key())); iter.SeekToLast(); ASSERT_TRUE(iter.Valid()); - ASSERT_EQ(*(keys.rbegin()), iter.key()); + ASSERT_EQ(*(keys.rbegin()), Decode(iter.key())); } // Forward iteration test - for (int i = 0; i < R; i++) { - InlineSkipList::Iterator iter(&list); - iter.Seek(i); + for (Key i = 0; i < R; i++) { + InlineSkipList::Iterator iter(&list); + iter.Seek(Encode(&i)); // Compare against model iterator std::set::iterator model_iter = keys.lower_bound(i); @@ -103,7 +119,7 @@ TEST_F(InlineSkipTest, InsertAndLookup) { break; } else { ASSERT_TRUE(iter.Valid()); - ASSERT_EQ(*model_iter, iter.key()); + ASSERT_EQ(*model_iter, Decode(iter.key())); ++model_iter; iter.Next(); } @@ -112,14 +128,14 @@ TEST_F(InlineSkipTest, InsertAndLookup) { // Backward iteration test { - InlineSkipList::Iterator iter(&list); + InlineSkipList::Iterator iter(&list); iter.SeekToLast(); // Compare against model iterator for (std::set::reverse_iterator model_iter = keys.rbegin(); model_iter != keys.rend(); ++model_iter) { ASSERT_TRUE(iter.Valid()); - ASSERT_EQ(*model_iter, iter.key()); + ASSERT_EQ(*model_iter, Decode(iter.key())); iter.Prev(); } ASSERT_TRUE(!iter.Valid()); @@ -210,7 +226,7 @@ class ConcurrentTest { // InlineSkipList is not protected by mu_. We just use a single writer // thread to modify it. - InlineSkipList list_; + InlineSkipList list_; public: ConcurrentTest() : list_(TestComparator(), &arena_) {} @@ -220,7 +236,9 @@ class ConcurrentTest { const uint32_t k = rnd->Next() % K; const int g = current_.Get(k) + 1; const Key new_key = MakeKey(k, g); - list_.Insert(new_key); + char* buf = list_.AllocateKey(sizeof(Key)); + memcpy(buf, &new_key, sizeof(Key)); + list_.Insert(buf); current_.Set(k, g); } @@ -232,14 +250,14 @@ class ConcurrentTest { } Key pos = RandomTarget(rnd); - InlineSkipList::Iterator iter(&list_); - iter.Seek(pos); + InlineSkipList::Iterator iter(&list_); + iter.Seek(Encode(&pos)); while (true) { Key current; if (!iter.Valid()) { current = MakeKey(K, 0); } else { - current = iter.key(); + current = Decode(iter.key()); ASSERT_TRUE(IsValidKey(current)) << current; } ASSERT_LE(pos, current) << "should not go backwards"; @@ -276,7 +294,7 @@ class ConcurrentTest { Key new_target = RandomTarget(rnd); if (new_target > pos) { pos = new_target; - iter.Seek(new_target); + iter.Seek(Encode(&new_target)); } } }