InlineSkiplist: don't decode keys unnecessarily during comparisons

Summary:
Summary
========
`InlineSkipList<>::Insert` takes the `key` parameter as a C-string. Then, it performs multiple comparisons with it requiring the `GetLengthPrefixedSlice()` to be spawn in `MemTable::KeyComparator::operator()(const char* prefix_len_key1, const char* prefix_len_key2)` on the same data over and over. The patch tries to optimize that.

Rough performance comparison
=====
Big keys, no compression.

```
$ ./db_bench --writes 20000000 --benchmarks="fillrandom" --compression_type none -key_size 256
(...)
fillrandom   :       4.222 micros/op 236836 ops/sec;   80.4 MB/s
```

```
$ ./db_bench --writes 20000000 --benchmarks="fillrandom" --compression_type none -key_size 256
(...)
fillrandom   :       4.064 micros/op 246059 ops/sec;   83.5 MB/s
```

TODO
======
In ~~a separated~~ this PR:
- [x] Go outside the write path. Maybe even eradicate the C-string-taking variant of `KeyIsAfterNode` entirely.
- [x] Try to cache the transformations applied by `KeyComparator` & friends in situations where we havy many comparisons with the same key.
Closes https://github.com/facebook/rocksdb/pull/3516

Differential Revision: D7059300

Pulled By: ajkr

fbshipit-source-id: 6f027dbb619a488129f79f79b5f7dbe566fb2dbb
main
Radoslaw Zarzynski 7 years ago committed by Facebook Github Bot
parent 1cbc96d236
commit 09b6bf828a
  1. 2
      db/memtable.cc
  2. 2
      db/memtable.h
  3. 12
      include/rocksdb/memtablerep.h
  4. 55
      memtable/inlineskiplist.h
  5. 16
      memtable/inlineskiplist_test.cc

@ -229,7 +229,7 @@ int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
} }
int MemTable::KeyComparator::operator()(const char* prefix_len_key, int MemTable::KeyComparator::operator()(const char* prefix_len_key,
const Slice& key) const KeyComparator::DecodedType& key)
const { const {
// Internal keys are encoded as length-prefixed strings. // Internal keys are encoded as length-prefixed strings.
Slice a = GetLengthPrefixedSlice(prefix_len_key); Slice a = GetLengthPrefixedSlice(prefix_len_key);

@ -84,7 +84,7 @@ class MemTable {
virtual int operator()(const char* prefix_len_key1, virtual int operator()(const char* prefix_len_key1,
const char* prefix_len_key2) const override; const char* prefix_len_key2) const override;
virtual int operator()(const char* prefix_len_key, virtual int operator()(const char* prefix_len_key,
const Slice& key) const override; const DecodedType& key) const override;
}; };
// MemTables are reference counted. The initial reference count // MemTables are reference counted. The initial reference count

@ -39,24 +39,34 @@
#include <stdexcept> #include <stdexcept>
#include <stdint.h> #include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
#include <rocksdb/slice.h>
namespace rocksdb { namespace rocksdb {
class Arena; class Arena;
class Allocator; class Allocator;
class LookupKey; class LookupKey;
class Slice;
class SliceTransform; class SliceTransform;
class Logger; class Logger;
typedef void* KeyHandle; typedef void* KeyHandle;
extern Slice GetLengthPrefixedSlice(const char* data);
class MemTableRep { class MemTableRep {
public: public:
// KeyComparator provides a means to compare keys, which are internal keys // KeyComparator provides a means to compare keys, which are internal keys
// concatenated with values. // concatenated with values.
class KeyComparator { class KeyComparator {
public: public:
typedef rocksdb::Slice DecodedType;
virtual DecodedType decode_key(const char* key) const {
// The format of key is frozen and can be terated as a part of the API
// contract. Refer to MemTable::Add for details.
return GetLengthPrefixedSlice(key);
}
// Compare a and b. Return a negative value if a is less than b, 0 if they // Compare a and b. Return a negative value if a is less than b, 0 if they
// are equal, and a positive value if a is greater than b // are equal, and a positive value if a is greater than b
virtual int operator()(const char* prefix_len_key1, virtual int operator()(const char* prefix_len_key1,

@ -45,9 +45,12 @@
#include <stdlib.h> #include <stdlib.h>
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <type_traits>
#include "port/likely.h" #include "port/likely.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/slice.h"
#include "util/allocator.h" #include "util/allocator.h"
#include "util/coding.h"
#include "util/random.h" #include "util/random.h"
namespace rocksdb { namespace rocksdb {
@ -59,6 +62,9 @@ class InlineSkipList {
struct Splice; struct Splice;
public: public:
using DecodedKey = \
typename std::remove_reference<Comparator>::type::DecodedType;
static const uint16_t kMaxPossibleHeight = 32; static const uint16_t kMaxPossibleHeight = 32;
// Create a new InlineSkipList object that will use "cmp" for comparing // Create a new InlineSkipList object that will use "cmp" for comparing
@ -212,6 +218,7 @@ class InlineSkipList {
// Return true if key is greater than the data stored in "n". Null n // Return true if key is greater than the data stored in "n". Null n
// is considered infinite. n should not be head_. // is considered infinite. n should not be head_.
bool KeyIsAfterNode(const char* key, Node* n) const; bool KeyIsAfterNode(const char* key, Node* n) const;
bool KeyIsAfterNode(const DecodedKey& key, Node* n) const;
// Returns the earliest node with a key >= key. // Returns the earliest node with a key >= key.
// Return nullptr if there is no such node. // Return nullptr if there is no such node.
@ -241,12 +248,12 @@ class InlineSkipList {
// a node that is after the key. after should be nullptr if a good after // a node that is after the key. after should be nullptr if a good after
// node isn't conveniently available. // node isn't conveniently available.
template<bool prefetch_before> template<bool prefetch_before>
void FindSpliceForLevel(const char* key, Node* before, Node* after, int level, void FindSpliceForLevel(const DecodedKey& key, Node* before, Node* after, int level,
Node** out_prev, Node** out_next); Node** out_prev, Node** out_next);
// Recomputes Splice levels from highest_level (inclusive) down to // Recomputes Splice levels from highest_level (inclusive) down to
// lowest_level (inclusive). // lowest_level (inclusive).
void RecomputeSpliceLevels(const char* key, Splice* splice, void RecomputeSpliceLevels(const DecodedKey& key, Splice* splice,
int recompute_level); int recompute_level);
// No copying allowed // No copying allowed
@ -435,6 +442,14 @@ bool InlineSkipList<Comparator>::KeyIsAfterNode(const char* key,
return (n != nullptr) && (compare_(n->Key(), key) < 0); return (n != nullptr) && (compare_(n->Key(), key) < 0);
} }
template <class Comparator>
bool InlineSkipList<Comparator>::KeyIsAfterNode(const DecodedKey& key,
Node* n) const {
// nullptr n is considered infinite
assert(n != head_);
return (n != nullptr) && (compare_(n->Key(), key) < 0);
}
template <class Comparator> template <class Comparator>
typename InlineSkipList<Comparator>::Node* typename InlineSkipList<Comparator>::Node*
InlineSkipList<Comparator>::FindGreaterOrEqual(const char* key) const { InlineSkipList<Comparator>::FindGreaterOrEqual(const char* key) const {
@ -446,6 +461,7 @@ InlineSkipList<Comparator>::FindGreaterOrEqual(const char* key) const {
Node* x = head_; Node* x = head_;
int level = GetMaxHeight() - 1; int level = GetMaxHeight() - 1;
Node* last_bigger = nullptr; Node* last_bigger = nullptr;
const DecodedKey key_decoded = compare_.decode_key(key);
while (true) { while (true) {
Node* next = x->Next(level); Node* next = x->Next(level);
if (next != nullptr) { if (next != nullptr) {
@ -454,10 +470,10 @@ InlineSkipList<Comparator>::FindGreaterOrEqual(const char* key) const {
// Make sure the lists are sorted // Make sure the lists are sorted
assert(x == head_ || next == nullptr || KeyIsAfterNode(next->Key(), x)); assert(x == head_ || next == nullptr || KeyIsAfterNode(next->Key(), x));
// Make sure we haven't overshot during our search // Make sure we haven't overshot during our search
assert(x == head_ || KeyIsAfterNode(key, x)); assert(x == head_ || KeyIsAfterNode(key_decoded, x));
int cmp = (next == nullptr || next == last_bigger) int cmp = (next == nullptr || next == last_bigger)
? 1 ? 1
: compare_(next->Key(), key); : compare_(next->Key(), key_decoded);
if (cmp == 0 || (cmp > 0 && level == 0)) { if (cmp == 0 || (cmp > 0 && level == 0)) {
return next; return next;
} else if (cmp < 0) { } else if (cmp < 0) {
@ -487,6 +503,7 @@ InlineSkipList<Comparator>::FindLessThan(const char* key, Node** prev,
Node* x = root; Node* x = root;
// KeyIsAfter(key, last_not_after) is definitely false // KeyIsAfter(key, last_not_after) is definitely false
Node* last_not_after = nullptr; Node* last_not_after = nullptr;
const DecodedKey key_decoded = compare_.decode_key(key);
while (true) { while (true) {
assert(x != nullptr); assert(x != nullptr);
Node* next = x->Next(level); Node* next = x->Next(level);
@ -494,8 +511,8 @@ InlineSkipList<Comparator>::FindLessThan(const char* key, Node** prev,
PREFETCH(next->Next(level), 0, 1); PREFETCH(next->Next(level), 0, 1);
} }
assert(x == head_ || next == nullptr || KeyIsAfterNode(next->Key(), x)); assert(x == head_ || next == nullptr || KeyIsAfterNode(next->Key(), x));
assert(x == head_ || KeyIsAfterNode(key, x)); assert(x == head_ || KeyIsAfterNode(key_decoded, x));
if (next != last_not_after && KeyIsAfterNode(key, next)) { if (next != last_not_after && KeyIsAfterNode(key_decoded, next)) {
// Keep searching in this list // Keep searching in this list
assert(next != nullptr); assert(next != nullptr);
x = next; x = next;
@ -540,13 +557,14 @@ uint64_t InlineSkipList<Comparator>::EstimateCount(const char* key) const {
Node* x = head_; Node* x = head_;
int level = GetMaxHeight() - 1; int level = GetMaxHeight() - 1;
const DecodedKey key_decoded = compare_.decode_key(key);
while (true) { while (true) {
assert(x == head_ || compare_(x->Key(), key) < 0); assert(x == head_ || compare_(x->Key(), key_decoded) < 0);
Node* next = x->Next(level); Node* next = x->Next(level);
if (next != nullptr) { if (next != nullptr) {
PREFETCH(next->Next(level), 0, 1); PREFETCH(next->Next(level), 0, 1);
} }
if (next == nullptr || compare_(next->Key(), key) >= 0) { if (next == nullptr || compare_(next->Key(), key_decoded) >= 0) {
if (level == 0) { if (level == 0) {
return count; return count;
} else { } else {
@ -654,7 +672,7 @@ bool InlineSkipList<Comparator>::InsertWithHint(const char* key, void** hint) {
template <class Comparator> template <class Comparator>
template <bool prefetch_before> template <bool prefetch_before>
void InlineSkipList<Comparator>::FindSpliceForLevel(const char* key, void InlineSkipList<Comparator>::FindSpliceForLevel(const DecodedKey& key,
Node* before, Node* after, Node* before, Node* after,
int level, Node** out_prev, int level, Node** out_prev,
Node** out_next) { Node** out_next) {
@ -682,7 +700,7 @@ void InlineSkipList<Comparator>::FindSpliceForLevel(const char* key,
} }
template <class Comparator> template <class Comparator>
void InlineSkipList<Comparator>::RecomputeSpliceLevels(const char* key, void InlineSkipList<Comparator>::RecomputeSpliceLevels(const DecodedKey& key,
Splice* splice, Splice* splice,
int recompute_level) { int recompute_level) {
assert(recompute_level > 0); assert(recompute_level > 0);
@ -698,6 +716,7 @@ template <bool UseCAS>
bool InlineSkipList<Comparator>::Insert(const char* key, Splice* splice, bool InlineSkipList<Comparator>::Insert(const char* key, Splice* splice,
bool allow_partial_splice_fix) { bool allow_partial_splice_fix) {
Node* x = reinterpret_cast<Node*>(const_cast<char*>(key)) - 1; Node* x = reinterpret_cast<Node*>(const_cast<char*>(key)) - 1;
const DecodedKey key_decoded = compare_.decode_key(key);
int height = x->UnstashHeight(); int height = x->UnstashHeight();
assert(height >= 1 && height <= kMaxHeight_); assert(height >= 1 && height <= kMaxHeight_);
@ -765,7 +784,8 @@ bool InlineSkipList<Comparator>::Insert(const char* key, Splice* splice,
// our chances of success. // our chances of success.
++recompute_height; ++recompute_height;
} else if (splice->prev_[recompute_height] != head_ && } else if (splice->prev_[recompute_height] != head_ &&
!KeyIsAfterNode(key, splice->prev_[recompute_height])) { !KeyIsAfterNode(key_decoded,
splice->prev_[recompute_height])) {
// key is from before splice // key is from before splice
if (allow_partial_splice_fix) { if (allow_partial_splice_fix) {
// skip all levels with the same node without more comparisons // skip all levels with the same node without more comparisons
@ -777,7 +797,8 @@ bool InlineSkipList<Comparator>::Insert(const char* key, Splice* splice,
// we're pessimistic, recompute everything // we're pessimistic, recompute everything
recompute_height = max_height; recompute_height = max_height;
} }
} else if (KeyIsAfterNode(key, splice->next_[recompute_height])) { } else if (KeyIsAfterNode(key_decoded,
splice->next_[recompute_height])) {
// key is from after splice // key is from after splice
if (allow_partial_splice_fix) { if (allow_partial_splice_fix) {
Node* bad = splice->next_[recompute_height]; Node* bad = splice->next_[recompute_height];
@ -795,7 +816,7 @@ bool InlineSkipList<Comparator>::Insert(const char* key, Splice* splice,
} }
assert(recompute_height <= max_height); assert(recompute_height <= max_height);
if (recompute_height > 0) { if (recompute_height > 0) {
RecomputeSpliceLevels(key, splice, recompute_height); RecomputeSpliceLevels(key_decoded, splice, recompute_height);
} }
bool splice_is_valid = true; bool splice_is_valid = true;
@ -827,8 +848,8 @@ bool InlineSkipList<Comparator>::Insert(const char* key, Splice* splice,
// search, because it should be unlikely that lots of nodes have // search, because it should be unlikely that lots of nodes have
// been inserted between prev[i] and next[i]. No point in using // been inserted between prev[i] and next[i]. No point in using
// next[i] as the after hint, because we know it is stale. // next[i] as the after hint, because we know it is stale.
FindSpliceForLevel<false>(key, splice->prev_[i], nullptr, i, &splice->prev_[i], FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i,
&splice->next_[i]); &splice->prev_[i], &splice->next_[i]);
// Since we've narrowed the bracket for level i, we might have // Since we've narrowed the bracket for level i, we might have
// violated the Splice constraint between i and i-1. Make sure // violated the Splice constraint between i and i-1. Make sure
@ -842,8 +863,8 @@ bool InlineSkipList<Comparator>::Insert(const char* key, Splice* splice,
for (int i = 0; i < height; ++i) { for (int i = 0; i < height; ++i) {
if (i >= recompute_height && if (i >= recompute_height &&
splice->prev_[i]->Next(i) != splice->next_[i]) { splice->prev_[i]->Next(i) != splice->next_[i]) {
FindSpliceForLevel<false>(key, splice->prev_[i], nullptr, i, &splice->prev_[i], FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i,
&splice->next_[i]); &splice->prev_[i], &splice->next_[i]);
} }
// Checking for duplicate keys on the level 0 is sufficient // Checking for duplicate keys on the level 0 is sufficient
if (UNLIKELY(i == 0 && splice->next_[i] != nullptr && if (UNLIKELY(i == 0 && splice->next_[i] != nullptr &&

@ -32,6 +32,12 @@ static Key Decode(const char* key) {
} }
struct TestComparator { struct TestComparator {
typedef Key DecodedType;
static DecodedType decode_key(const char* b) {
return Decode(b);
}
int operator()(const char* a, const char* b) const { int operator()(const char* a, const char* b) const {
if (Decode(a) < Decode(b)) { if (Decode(a) < Decode(b)) {
return -1; return -1;
@ -41,6 +47,16 @@ struct TestComparator {
return 0; return 0;
} }
} }
int operator()(const char* a, const DecodedType b) const {
if (Decode(a) < b) {
return -1;
} else if (Decode(a) > b) {
return +1;
} else {
return 0;
}
}
}; };
typedef InlineSkipList<TestComparator> TestInlineSkipList; typedef InlineSkipList<TestComparator> TestInlineSkipList;

Loading…
Cancel
Save