InlineSkipList - part 2/3

Summary:
This diff is 2/3 in a sequence that introduces a skip list optimized
for a key that is a freshly-allocated const char*.  The change is broken
into pieces to make it easier to review.  This piece removes the Key
template type, introduces the AllocateKey interface, and changes the
unit test from using uint64_t as the Key type to using pointers to an 8
byte blob.

Test Plan: unit test

Reviewers: igor, sdong

Reviewed By: sdong

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D51285
main
Nathan Bronson 9 years ago
parent 78812ec6bf
commit 5201729545
  1. 126
      db/inlineskiplist.h
  2. 74
      db/inlineskiplist_test.cc

@ -38,7 +38,7 @@
namespace rocksdb {
template <typename Key, class Comparator>
template <class Comparator>
class InlineSkipList {
private:
struct Node;
@ -52,15 +52,18 @@ class InlineSkipList {
int32_t max_height = 12,
int32_t branching_factor = 4);
// Allocates a key that can be passed to Insert.
char* AllocateKey(size_t key_size);
// Insert key into the list.
// REQUIRES: nothing that compares equal to key is currently in the list.
void Insert(const Key& key);
void Insert(const char* key);
// Returns true iff an entry that compares equal to key is in the list.
bool Contains(const Key& key) const;
bool Contains(const char* key) const;
// Return estimated number of entries smaller than `key`.
uint64_t EstimateCount(const Key& key) const;
uint64_t EstimateCount(const char* key) const;
// Iteration over the contents of a skip list
class Iterator {
@ -79,7 +82,7 @@ class InlineSkipList {
// Returns the key at the current position.
// REQUIRES: Valid()
const Key& key() const;
const char* key() const;
// Advances to the next position.
// REQUIRES: Valid()
@ -90,7 +93,7 @@ class InlineSkipList {
void Prev();
// Advance to the first entry with a key >= target
void Seek(const Key& target);
void Seek(const char* target);
// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
@ -132,22 +135,25 @@ class InlineSkipList {
return max_height_.load(std::memory_order_relaxed);
}
Node* NewNode(const Key& key, int height);
Node* NewNode(const char* key, int height);
int RandomHeight();
bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }
bool Equal(const char* a, const char* b) const {
return (compare_(a, b) == 0);
}
// Return true if key is greater than the data stored in "n"
bool KeyIsAfterNode(const Key& key, Node* n) const;
bool KeyIsAfterNode(const char* key, Node* n) const;
// Returns the earliest node with a key >= key.
// Return nullptr if there is no such node.
Node* FindGreaterOrEqual(const Key& key) const;
Node* FindGreaterOrEqual(const char* key) const;
// Return the latest node with a key < key.
// Return head_ if there is no such node.
// Fills prev[level] with pointer to previous node at "level" for every
// level in [0..max_height_-1], if prev is non-null.
Node* FindLessThan(const Key& key, Node** prev = nullptr) const;
Node* FindLessThan(const char* key, Node** prev = nullptr) const;
// Return the last node in the list.
// Return head_ if list is empty.
@ -159,11 +165,11 @@ class InlineSkipList {
};
// Implementation details follow
template <typename Key, class Comparator>
struct InlineSkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) {}
template <class Comparator>
struct InlineSkipList<Comparator>::Node {
explicit Node(const char* k) : key(k) {}
Key const key;
const char* const key;
// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
@ -195,46 +201,46 @@ struct InlineSkipList<Key, Comparator>::Node {
std::atomic<Node*> next_[1];
};
template <typename Key, class Comparator>
typename InlineSkipList<Key, Comparator>::Node*
InlineSkipList<Key, Comparator>::NewNode(const Key& key, int height) {
template <class Comparator>
typename InlineSkipList<Comparator>::Node* InlineSkipList<Comparator>::NewNode(
const char* key, int height) {
char* mem = allocator_->AllocateAligned(
sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
return new (mem) Node(key);
}
template <typename Key, class Comparator>
inline InlineSkipList<Key, Comparator>::Iterator::Iterator(
template <class Comparator>
inline InlineSkipList<Comparator>::Iterator::Iterator(
const InlineSkipList* list) {
SetList(list);
}
template <typename Key, class Comparator>
inline void InlineSkipList<Key, Comparator>::Iterator::SetList(
template <class Comparator>
inline void InlineSkipList<Comparator>::Iterator::SetList(
const InlineSkipList* list) {
list_ = list;
node_ = nullptr;
}
template <typename Key, class Comparator>
inline bool InlineSkipList<Key, Comparator>::Iterator::Valid() const {
template <class Comparator>
inline bool InlineSkipList<Comparator>::Iterator::Valid() const {
return node_ != nullptr;
}
template <typename Key, class Comparator>
inline const Key& InlineSkipList<Key, Comparator>::Iterator::key() const {
template <class Comparator>
inline const char* InlineSkipList<Comparator>::Iterator::key() const {
assert(Valid());
return node_->key;
}
template <typename Key, class Comparator>
inline void InlineSkipList<Key, Comparator>::Iterator::Next() {
template <class Comparator>
inline void InlineSkipList<Comparator>::Iterator::Next() {
assert(Valid());
node_ = node_->Next(0);
}
template <typename Key, class Comparator>
inline void InlineSkipList<Key, Comparator>::Iterator::Prev() {
template <class Comparator>
inline void InlineSkipList<Comparator>::Iterator::Prev() {
// Instead of using explicit "prev" links, we just search for the
// last node that falls before key.
assert(Valid());
@ -244,26 +250,26 @@ inline void InlineSkipList<Key, Comparator>::Iterator::Prev() {
}
}
template <typename Key, class Comparator>
inline void InlineSkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
template <class Comparator>
inline void InlineSkipList<Comparator>::Iterator::Seek(const char* target) {
node_ = list_->FindGreaterOrEqual(target);
}
template <typename Key, class Comparator>
inline void InlineSkipList<Key, Comparator>::Iterator::SeekToFirst() {
template <class Comparator>
inline void InlineSkipList<Comparator>::Iterator::SeekToFirst() {
node_ = list_->head_->Next(0);
}
template <typename Key, class Comparator>
inline void InlineSkipList<Key, Comparator>::Iterator::SeekToLast() {
template <class Comparator>
inline void InlineSkipList<Comparator>::Iterator::SeekToLast() {
node_ = list_->FindLast();
if (node_ == list_->head_) {
node_ = nullptr;
}
}
template <typename Key, class Comparator>
int InlineSkipList<Key, Comparator>::RandomHeight() {
template <class Comparator>
int InlineSkipList<Comparator>::RandomHeight() {
auto rnd = Random::GetTLSInstance();
// Increase height with probability 1 in kBranching
@ -276,16 +282,16 @@ int InlineSkipList<Key, Comparator>::RandomHeight() {
return height;
}
template <typename Key, class Comparator>
bool InlineSkipList<Key, Comparator>::KeyIsAfterNode(const Key& key,
template <class Comparator>
bool InlineSkipList<Comparator>::KeyIsAfterNode(const char* key,
Node* n) const {
// nullptr n is considered infinite
return (n != nullptr) && (compare_(n->key, key) < 0);
}
template <typename Key, class Comparator>
typename InlineSkipList<Key, Comparator>::Node*
InlineSkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key) const {
template <class Comparator>
typename InlineSkipList<Comparator>::Node*
InlineSkipList<Comparator>::FindGreaterOrEqual(const char* key) const {
// Note: It looks like we could reduce duplication by implementing
// this function as FindLessThan(key)->Next(0), but we wouldn't be able
// to exit early on equality and the result wouldn't even be correct.
@ -315,10 +321,9 @@ InlineSkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key) const {
}
}
template <typename Key, class Comparator>
typename InlineSkipList<Key, Comparator>::Node*
InlineSkipList<Key, Comparator>::FindLessThan(const Key& key,
Node** prev) const {
template <class Comparator>
typename InlineSkipList<Comparator>::Node*
InlineSkipList<Comparator>::FindLessThan(const char* key, Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
// KeyIsAfter(key, last_not_after) is definitely false
@ -345,9 +350,9 @@ InlineSkipList<Key, Comparator>::FindLessThan(const Key& key,
}
}
template <typename Key, class Comparator>
typename InlineSkipList<Key, Comparator>::Node*
InlineSkipList<Key, Comparator>::FindLast() const {
template <class Comparator>
typename InlineSkipList<Comparator>::Node*
InlineSkipList<Comparator>::FindLast() const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
@ -365,8 +370,8 @@ InlineSkipList<Key, Comparator>::FindLast() const {
}
}
template <typename Key, class Comparator>
uint64_t InlineSkipList<Key, Comparator>::EstimateCount(const Key& key) const {
template <class Comparator>
uint64_t InlineSkipList<Comparator>::EstimateCount(const char* key) const {
uint64_t count = 0;
Node* x = head_;
@ -389,8 +394,8 @@ uint64_t InlineSkipList<Key, Comparator>::EstimateCount(const Key& key) const {
}
}
template <typename Key, class Comparator>
InlineSkipList<Key, Comparator>::InlineSkipList(const Comparator cmp,
template <class Comparator>
InlineSkipList<Comparator>::InlineSkipList(const Comparator cmp,
Allocator* allocator,
int32_t max_height,
int32_t branching_factor)
@ -417,8 +422,13 @@ InlineSkipList<Key, Comparator>::InlineSkipList(const Comparator cmp,
}
}
template <typename Key, class Comparator>
void InlineSkipList<Key, Comparator>::Insert(const Key& key) {
template <class Comparator>
char* InlineSkipList<Comparator>::AllocateKey(size_t key_size) {
return allocator_->Allocate(key_size);
}
template <class Comparator>
void InlineSkipList<Comparator>::Insert(const char* key) {
// fast path for sequential insertion
if (!KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) &&
(prev_[0] == head_ || KeyIsAfterNode(key, prev_[0]))) {
@ -469,8 +479,8 @@ void InlineSkipList<Key, Comparator>::Insert(const Key& key) {
prev_height_ = height;
}
template <typename Key, class Comparator>
bool InlineSkipList<Key, Comparator>::Contains(const Key& key) const {
template <class Comparator>
bool InlineSkipList<Comparator>::Contains(const char* key) const {
Node* x = FindGreaterOrEqual(key);
if (x != nullptr && Equal(key, x->key)) {
return true;

@ -17,13 +17,24 @@
namespace rocksdb {
// Our test skip list stores 8-byte unsigned integers
typedef uint64_t Key;
static const char* Encode(const uint64_t* key) {
return reinterpret_cast<const char*>(key);
}
static Key Decode(const char* key) {
Key rv;
memcpy(&rv, key, sizeof(Key));
return rv;
}
struct TestComparator {
int operator()(const Key& a, const Key& b) const {
if (a < b) {
int operator()(const char* a, const char* b) const {
if (Decode(a) < Decode(b)) {
return -1;
} else if (a > b) {
} else if (Decode(a) > Decode(b)) {
return +1;
} else {
return 0;
@ -36,14 +47,16 @@ class InlineSkipTest : public testing::Test {};
TEST_F(InlineSkipTest, Empty) {
Arena arena;
TestComparator cmp;
InlineSkipList<Key, TestComparator> list(cmp, &arena);
ASSERT_TRUE(!list.Contains(10));
InlineSkipList<TestComparator> list(cmp, &arena);
Key key = 10;
ASSERT_TRUE(!list.Contains(Encode(&key)));
InlineSkipList<Key, TestComparator>::Iterator iter(&list);
InlineSkipList<TestComparator>::Iterator iter(&list);
ASSERT_TRUE(!iter.Valid());
iter.SeekToFirst();
ASSERT_TRUE(!iter.Valid());
iter.Seek(100);
key = 100;
iter.Seek(Encode(&key));
ASSERT_TRUE(!iter.Valid());
iter.SeekToLast();
ASSERT_TRUE(!iter.Valid());
@ -56,16 +69,18 @@ TEST_F(InlineSkipTest, InsertAndLookup) {
std::set<Key> keys;
Arena arena;
TestComparator cmp;
InlineSkipList<Key, TestComparator> list(cmp, &arena);
InlineSkipList<TestComparator> list(cmp, &arena);
for (int i = 0; i < N; i++) {
Key key = rnd.Next() % R;
if (keys.insert(key).second) {
list.Insert(key);
char* buf = list.AllocateKey(sizeof(Key));
memcpy(buf, &key, sizeof(Key));
list.Insert(buf);
}
}
for (int i = 0; i < R; i++) {
if (list.Contains(i)) {
for (Key i = 0; i < R; i++) {
if (list.Contains(Encode(&i))) {
ASSERT_EQ(keys.count(i), 1U);
} else {
ASSERT_EQ(keys.count(i), 0U);
@ -74,26 +89,27 @@ TEST_F(InlineSkipTest, InsertAndLookup) {
// Simple iterator tests
{
InlineSkipList<Key, TestComparator>::Iterator iter(&list);
InlineSkipList<TestComparator>::Iterator iter(&list);
ASSERT_TRUE(!iter.Valid());
iter.Seek(0);
uint64_t zero = 0;
iter.Seek(Encode(&zero));
ASSERT_TRUE(iter.Valid());
ASSERT_EQ(*(keys.begin()), iter.key());
ASSERT_EQ(*(keys.begin()), Decode(iter.key()));
iter.SeekToFirst();
ASSERT_TRUE(iter.Valid());
ASSERT_EQ(*(keys.begin()), iter.key());
ASSERT_EQ(*(keys.begin()), Decode(iter.key()));
iter.SeekToLast();
ASSERT_TRUE(iter.Valid());
ASSERT_EQ(*(keys.rbegin()), iter.key());
ASSERT_EQ(*(keys.rbegin()), Decode(iter.key()));
}
// Forward iteration test
for (int i = 0; i < R; i++) {
InlineSkipList<Key, TestComparator>::Iterator iter(&list);
iter.Seek(i);
for (Key i = 0; i < R; i++) {
InlineSkipList<TestComparator>::Iterator iter(&list);
iter.Seek(Encode(&i));
// Compare against model iterator
std::set<Key>::iterator model_iter = keys.lower_bound(i);
@ -103,7 +119,7 @@ TEST_F(InlineSkipTest, InsertAndLookup) {
break;
} else {
ASSERT_TRUE(iter.Valid());
ASSERT_EQ(*model_iter, iter.key());
ASSERT_EQ(*model_iter, Decode(iter.key()));
++model_iter;
iter.Next();
}
@ -112,14 +128,14 @@ TEST_F(InlineSkipTest, InsertAndLookup) {
// Backward iteration test
{
InlineSkipList<Key, TestComparator>::Iterator iter(&list);
InlineSkipList<TestComparator>::Iterator iter(&list);
iter.SeekToLast();
// Compare against model iterator
for (std::set<Key>::reverse_iterator model_iter = keys.rbegin();
model_iter != keys.rend(); ++model_iter) {
ASSERT_TRUE(iter.Valid());
ASSERT_EQ(*model_iter, iter.key());
ASSERT_EQ(*model_iter, Decode(iter.key()));
iter.Prev();
}
ASSERT_TRUE(!iter.Valid());
@ -210,7 +226,7 @@ class ConcurrentTest {
// InlineSkipList is not protected by mu_. We just use a single writer
// thread to modify it.
InlineSkipList<Key, TestComparator> list_;
InlineSkipList<TestComparator> list_;
public:
ConcurrentTest() : list_(TestComparator(), &arena_) {}
@ -220,7 +236,9 @@ class ConcurrentTest {
const uint32_t k = rnd->Next() % K;
const int g = current_.Get(k) + 1;
const Key new_key = MakeKey(k, g);
list_.Insert(new_key);
char* buf = list_.AllocateKey(sizeof(Key));
memcpy(buf, &new_key, sizeof(Key));
list_.Insert(buf);
current_.Set(k, g);
}
@ -232,14 +250,14 @@ class ConcurrentTest {
}
Key pos = RandomTarget(rnd);
InlineSkipList<Key, TestComparator>::Iterator iter(&list_);
iter.Seek(pos);
InlineSkipList<TestComparator>::Iterator iter(&list_);
iter.Seek(Encode(&pos));
while (true) {
Key current;
if (!iter.Valid()) {
current = MakeKey(K, 0);
} else {
current = iter.key();
current = Decode(iter.key());
ASSERT_TRUE(IsValidKey(current)) << current;
}
ASSERT_LE(pos, current) << "should not go backwards";
@ -276,7 +294,7 @@ class ConcurrentTest {
Key new_target = RandomTarget(rnd);
if (new_target > pos) {
pos = new_target;
iter.Seek(new_target);
iter.Seek(Encode(&new_target));
}
}
}

Loading…
Cancel
Save