HashLinkList memtable switches a bucket to a skip list to reduce performance outliers

Summary:
In this patch, we enhance HashLinkList memtable to reduce performance outliers when a bucket contains too many entries. We switch to skip list for this case to enable binary search.

Add threshold_use_skiplist parameter to determine when a bucket needs to switch to skip list.

The new data structure is documented in comments in the codes.

Test Plan:
make all check
set threshold_use_skiplist in several tests

Reviewers: yhchiang, haobo, ljin

Reviewed By: yhchiang, ljin

Subscribers: nkg-, xjin, dhruba, yhchiang, leveldb

Differential Revision: https://reviews.facebook.net/D19299
main
sdong 11 years ago
parent 6634844dba
commit 9c332aa11a
  1. 3
      HISTORY.md
  2. 3
      db/db_test.cc
  3. 7
      db/dbformat.h
  4. 2
      db/plain_table_db_test.cc
  5. 5
      db/prefix_test.cc
  6. 12
      include/rocksdb/memtablerep.h
  7. 383
      util/hash_linklist_rep.cc
  8. 3
      util/hash_linklist_rep.h

@ -2,6 +2,9 @@
## Unreleased ## Unreleased
### New Features
* HashLinklist reduces performance outlier caused by skewed bucket by switching data in the bucket from linked list to skip list. Add parameter threshold_use_skiplist in NewHashLinkListRepFactory().
## 3.2.0 (06/20/2014) ## 3.2.0 (06/20/2014)

@ -488,7 +488,8 @@ class DBTest {
break; break;
case kHashLinkList: case kHashLinkList:
options.prefix_extractor.reset(NewFixedPrefixTransform(1)); options.prefix_extractor.reset(NewFixedPrefixTransform(1));
options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0)); options.memtable_factory.reset(
NewHashLinkListRepFactory(4, 0, 3, true, 4));
break; break;
case kHashCuckoo: case kHashCuckoo:
options.memtable_factory.reset( options.memtable_factory.reset(

@ -297,6 +297,13 @@ class IterKey {
parsed_key_suffix.sequence, parsed_key_suffix.type); parsed_key_suffix.sequence, parsed_key_suffix.type);
} }
void EncodeLengthPrefixedKey(const Slice& key) {
auto size = key.size();
EnlargeBufferIfNeeded(size + VarintLength(size));
char* ptr = EncodeVarint32(key_, size);
memcpy(ptr, key.data(), size);
}
private: private:
char* key_; char* key_;
size_t buf_size_; size_t buf_size_;

@ -62,7 +62,7 @@ class PlainTableDBTest {
Options CurrentOptions() { Options CurrentOptions() {
Options options; Options options;
options.table_factory.reset(NewPlainTableFactory(0, 2, 0.8, 3, 0, kPrefix)); options.table_factory.reset(NewPlainTableFactory(0, 2, 0.8, 3, 0, kPrefix));
options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true)); options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true, 3));
options.prefix_extractor.reset(NewFixedPrefixTransform(8)); options.prefix_extractor.reset(NewFixedPrefixTransform(8));
options.allow_mmap_reads = true; options.allow_mmap_reads = true;
return options; return options;

@ -189,6 +189,10 @@ class PrefixTest {
options.memtable_factory.reset( options.memtable_factory.reset(
NewHashLinkListRepFactory(bucket_count, 2 * 1024 * 1024)); NewHashLinkListRepFactory(bucket_count, 2 * 1024 * 1024));
return true; return true;
case kHashLinkListTriggerSkipList:
options.memtable_factory.reset(
NewHashLinkListRepFactory(bucket_count, 0, 3));
return true;
default: default:
return false; return false;
} }
@ -208,6 +212,7 @@ class PrefixTest {
kHashSkipList, kHashSkipList,
kHashLinkList, kHashLinkList,
kHashLinkListHugePageTlb, kHashLinkListHugePageTlb,
kHashLinkListTriggerSkipList,
kEnd kEnd
}; };
int option_config_; int option_config_;

@ -227,9 +227,10 @@ extern MemTableRepFactory* NewHashSkipListRepFactory(
int32_t skiplist_branching_factor = 4 int32_t skiplist_branching_factor = 4
); );
// The factory is to create memtables with a hashed linked list: // The factory is to create memtables based on a hash table:
// it contains a fixed array of buckets, each pointing to a sorted single // it contains a fixed array of buckets, each pointing to either a linked list
// linked list (null if the bucket is empty). // or a skip list if number of entries inside the bucket exceeds
// threshold_use_skiplist.
// @bucket_count: number of fixed array buckets // @bucket_count: number of fixed array buckets
// @huge_page_tlb_size: if <=0, allocate the hash table bytes from malloc. // @huge_page_tlb_size: if <=0, allocate the hash table bytes from malloc.
// Otherwise from huge page TLB. The user needs to reserve // Otherwise from huge page TLB. The user needs to reserve
@ -240,10 +241,13 @@ extern MemTableRepFactory* NewHashSkipListRepFactory(
// exceeds this number, log about it. // exceeds this number, log about it.
// @if_log_bucket_dist_when_flash: if true, log distribution of number of // @if_log_bucket_dist_when_flash: if true, log distribution of number of
// entries when flushing. // entries when flushing.
// @threshold_use_skiplist: a bucket switches to skip list if number of
// entries exceed this parameter.
extern MemTableRepFactory* NewHashLinkListRepFactory( extern MemTableRepFactory* NewHashLinkListRepFactory(
size_t bucket_count = 50000, size_t huge_page_tlb_size = 0, size_t bucket_count = 50000, size_t huge_page_tlb_size = 0,
int bucket_entries_logging_threshold = 4096, int bucket_entries_logging_threshold = 4096,
bool if_log_bucket_dist_when_flash = true); bool if_log_bucket_dist_when_flash = true,
uint32_t threshold_use_skiplist = 256);
// This factory creates a cuckoo-hashing based mem-table representation. // This factory creates a cuckoo-hashing based mem-table representation.
// Cuckoo-hash is a closed-hash strategy, in which all key/value pairs // Cuckoo-hash is a closed-hash strategy, in which all key/value pairs

@ -7,6 +7,7 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "util/hash_linklist_rep.h" #include "util/hash_linklist_rep.h"
#include <algorithm>
#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h"
#include "util/arena.h" #include "util/arena.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
@ -22,6 +23,31 @@ namespace rocksdb {
namespace { namespace {
typedef const char* Key; typedef const char* Key;
typedef SkipList<Key, const MemTableRep::KeyComparator&> MemtableSkipList;
typedef port::AtomicPointer Pointer;
// A data structure used as the header of a link list of a hash bucket.
struct BucketHeader {
Pointer next;
uint32_t num_entries;
explicit BucketHeader(void* n, uint32_t count)
: next(n), num_entries(count) {}
bool IsSkipListBucket() { return next.NoBarrier_Load() == this; }
};
// A data structure used as the header of a skip list of a hash bucket.
struct SkipListBucketHeader {
BucketHeader Counting_header;
MemtableSkipList skip_list;
explicit SkipListBucketHeader(const MemTableRep::KeyComparator& cmp,
Arena* arena, uint32_t count)
: Counting_header(this, // Pointing to itself to indicate header type.
count),
skip_list(cmp, arena) {}
};
struct Node { struct Node {
// Accessors/mutators for links. Wrapped in methods so we can // Accessors/mutators for links. Wrapped in methods so we can
@ -51,12 +77,75 @@ struct Node {
char key[0]; char key[0];
}; };
// Memory structure of the mem table:
// It is a hash table, each bucket points to one entry, a linked list or a
// skip list. In order to track total number of records in a bucket to determine
// whether should switch to skip list, a header is added just to indicate
// number of entries in the bucket.
//
//
// +-----> NULL Case 1. Empty bucket
// |
// |
// | +---> +-------+
// | | | Next +--> NULL
// | | +-------+
// +-----+ | | | | Case 2. One Entry in bucket.
// | +-+ | | Data | next pointer points to
// +-----+ | | | NULL. All other cases
// | | | | | next pointer is not NULL.
// +-----+ | +-------+
// | +---+
// +-----+ +-> +-------+ +> +-------+ +-> +-------+
// | | | | Next +--+ | Next +--+ | Next +-->NULL
// +-----+ | +-------+ +-------+ +-------+
// | +-----+ | Count | | | | |
// +-----+ +-------+ | Data | | Data |
// | | | | | |
// +-----+ Case 3. | | | |
// | | A header +-------+ +-------+
// +-----+ points to
// | | a linked list. Count indicates total number
// +-----+ of rows in this bucket.
// | |
// +-----+ +-> +-------+ <--+
// | | | | Next +----+
// +-----+ | +-------+ Case 4. A header points to a skip
// | +----+ | Count | list and next pointer points to
// +-----+ +-------+ itself, to distinguish case 3 or 4.
// | | | | Count still is kept to indicates total
// +-----+ | Skip +--> of entries in the bucket for debugging
// | | | List | Data purpose.
// | | | +-->
// +-----+ | |
// | | +-------+
// +-----+
//
// We don't have data race when changing cases because:
// (1) When changing from case 2->3, we create a new bucket header, put the
// single node there first without changing the original node, and do a
// release store when changing the bucket pointer. In that case, a reader
// who sees a stale value of the bucket pointer will read this node, while
// a reader sees the correct value because of the release store.
// (2) When changing case 3->4, a new header is created with skip list points
// to the data, before doing an acquire store to change the bucket pointer.
// The old header and nodes are never changed, so any reader sees any
// of those existing pointers will guarantee to be able to iterate to the
// end of the linked list.
// (3) Header's next pointer in case 3 might change, but they are never equal
// to itself, so no matter a reader sees any stale or newer value, it will
// be able to correctly distinguish case 3 and 4.
//
// The reason that we use case 2 is we want to make the format to be efficient
// when the utilization of buckets is relatively low. If we use case 3 for
// single entry bucket, we will need to waste 12 bytes for every entry,
// which can be significant decrease of memory utilization.
class HashLinkListRep : public MemTableRep { class HashLinkListRep : public MemTableRep {
public: public:
HashLinkListRep(const MemTableRep::KeyComparator& compare, Arena* arena, HashLinkListRep(const MemTableRep::KeyComparator& compare, Arena* arena,
const SliceTransform* transform, size_t bucket_size, const SliceTransform* transform, size_t bucket_size,
size_t huge_page_tlb_size, Logger* logger, uint32_t threshold_use_skiplist, size_t huge_page_tlb_size,
int bucket_entries_logging_threshold, Logger* logger, int bucket_entries_logging_threshold,
bool if_log_bucket_dist_when_flash); bool if_log_bucket_dist_when_flash);
virtual KeyHandle Allocate(const size_t len, char** buf) override; virtual KeyHandle Allocate(const size_t len, char** buf) override;
@ -80,7 +169,6 @@ class HashLinkListRep : public MemTableRep {
private: private:
friend class DynamicIterator; friend class DynamicIterator;
typedef SkipList<const char*, const MemTableRep::KeyComparator&> FullList;
size_t bucket_size_; size_t bucket_size_;
@ -88,6 +176,8 @@ class HashLinkListRep : public MemTableRep {
// the same transform. // the same transform.
port::AtomicPointer* buckets_; port::AtomicPointer* buckets_;
const uint32_t threshold_use_skiplist_;
// The user-supplied transform whose domain is the user keys. // The user-supplied transform whose domain is the user keys.
const SliceTransform* transform_; const SliceTransform* transform_;
@ -97,7 +187,12 @@ class HashLinkListRep : public MemTableRep {
int bucket_entries_logging_threshold_; int bucket_entries_logging_threshold_;
bool if_log_bucket_dist_when_flash_; bool if_log_bucket_dist_when_flash_;
bool BucketContains(Node* head, const Slice& key) const; bool LinkListContains(Node* head, const Slice& key) const;
SkipListBucketHeader* GetSkipListBucketHeader(Pointer* first_next_pointer)
const;
Node* GetLinkListFirstNode(Pointer* first_next_pointer) const;
Slice GetPrefix(const Slice& internal_key) const { Slice GetPrefix(const Slice& internal_key) const {
return transform_->Transform(ExtractUserKey(internal_key)); return transform_->Transform(ExtractUserKey(internal_key));
@ -107,11 +202,11 @@ class HashLinkListRep : public MemTableRep {
return MurmurHash(slice.data(), slice.size(), 0) % bucket_size_; return MurmurHash(slice.data(), slice.size(), 0) % bucket_size_;
} }
Node* GetBucket(size_t i) const { Pointer* GetBucket(size_t i) const {
return static_cast<Node*>(buckets_[i].Acquire_Load()); return static_cast<Pointer*>(buckets_[i].Acquire_Load());
} }
Node* GetBucket(const Slice& slice) const { Pointer* GetBucket(const Slice& slice) const {
return GetBucket(GetHash(slice)); return GetBucket(GetHash(slice));
} }
@ -119,7 +214,6 @@ class HashLinkListRep : public MemTableRep {
return (compare_(b, a) == 0); return (compare_(b, a) == 0);
} }
bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); } bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }
bool KeyIsAfterNode(const Slice& internal_key, const Node* n) const { bool KeyIsAfterNode(const Slice& internal_key, const Node* n) const {
@ -137,7 +231,7 @@ class HashLinkListRep : public MemTableRep {
class FullListIterator : public MemTableRep::Iterator { class FullListIterator : public MemTableRep::Iterator {
public: public:
explicit FullListIterator(FullList* list, Arena* arena) explicit FullListIterator(MemtableSkipList* list, Arena* arena)
: iter_(list), full_list_(list), arena_(arena) {} : iter_(list), full_list_(list), arena_(arena) {}
virtual ~FullListIterator() { virtual ~FullListIterator() {
@ -189,22 +283,22 @@ class HashLinkListRep : public MemTableRep {
iter_.SeekToLast(); iter_.SeekToLast();
} }
private: private:
FullList::Iterator iter_; MemtableSkipList::Iterator iter_;
// To destruct with the iterator. // To destruct with the iterator.
std::unique_ptr<FullList> full_list_; std::unique_ptr<MemtableSkipList> full_list_;
std::unique_ptr<Arena> arena_; std::unique_ptr<Arena> arena_;
std::string tmp_; // For passing to EncodeKey std::string tmp_; // For passing to EncodeKey
}; };
class Iterator : public MemTableRep::Iterator { class LinkListIterator : public MemTableRep::Iterator {
public: public:
explicit Iterator(const HashLinkListRep* const hash_link_list_rep, explicit LinkListIterator(const HashLinkListRep* const hash_link_list_rep,
Node* head) : Node* head)
hash_link_list_rep_(hash_link_list_rep), head_(head), node_(nullptr) { : hash_link_list_rep_(hash_link_list_rep),
} head_(head),
node_(nullptr) {}
virtual ~Iterator() { virtual ~LinkListIterator() {}
}
// Returns true iff the iterator is positioned at a valid node. // Returns true iff the iterator is positioned at a valid node.
virtual bool Valid() const { virtual bool Valid() const {
@ -271,22 +365,68 @@ class HashLinkListRep : public MemTableRep {
} }
}; };
class DynamicIterator : public HashLinkListRep::Iterator { class DynamicIterator : public HashLinkListRep::LinkListIterator {
public: public:
explicit DynamicIterator(HashLinkListRep& memtable_rep) explicit DynamicIterator(HashLinkListRep& memtable_rep)
: HashLinkListRep::Iterator(&memtable_rep, nullptr), : HashLinkListRep::LinkListIterator(&memtable_rep, nullptr),
memtable_rep_(memtable_rep) {} memtable_rep_(memtable_rep) {}
// Advance to the first entry with a key >= target // Advance to the first entry with a key >= target
virtual void Seek(const Slice& k, const char* memtable_key) { virtual void Seek(const Slice& k, const char* memtable_key) {
auto transformed = memtable_rep_.GetPrefix(k); auto transformed = memtable_rep_.GetPrefix(k);
Reset(memtable_rep_.GetBucket(transformed)); auto* bucket = memtable_rep_.GetBucket(transformed);
HashLinkListRep::Iterator::Seek(k, memtable_key);
SkipListBucketHeader* skip_list_header =
memtable_rep_.GetSkipListBucketHeader(bucket);
if (skip_list_header != nullptr) {
// The bucket is organized as a skip list
if (!skip_list_iter_) {
skip_list_iter_.reset(
new MemtableSkipList::Iterator(&skip_list_header->skip_list));
} else {
skip_list_iter_->SetList(&skip_list_header->skip_list);
}
if (memtable_key != nullptr) {
skip_list_iter_->Seek(memtable_key);
} else {
IterKey encoded_key;
encoded_key.EncodeLengthPrefixedKey(k);
skip_list_iter_->Seek(encoded_key.GetKey().data());
}
} else {
// The bucket is organized as a linked list
skip_list_iter_.reset();
Reset(memtable_rep_.GetLinkListFirstNode(bucket));
HashLinkListRep::LinkListIterator::Seek(k, memtable_key);
}
}
virtual bool Valid() const {
if (skip_list_iter_) {
return skip_list_iter_->Valid();
}
return HashLinkListRep::LinkListIterator::Valid();
}
virtual const char* key() const {
if (skip_list_iter_) {
return skip_list_iter_->key();
}
return HashLinkListRep::LinkListIterator::key();
}
virtual void Next() {
if (skip_list_iter_) {
skip_list_iter_->Next();
} else {
HashLinkListRep::LinkListIterator::Next();
}
} }
private: private:
// the underlying memtable // the underlying memtable
const HashLinkListRep& memtable_rep_; const HashLinkListRep& memtable_rep_;
std::unique_ptr<MemtableSkipList::Iterator> skip_list_iter_;
}; };
class EmptyIterator : public MemTableRep::Iterator { class EmptyIterator : public MemTableRep::Iterator {
@ -312,12 +452,16 @@ class HashLinkListRep : public MemTableRep {
HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare, HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare,
Arena* arena, const SliceTransform* transform, Arena* arena, const SliceTransform* transform,
size_t bucket_size, size_t huge_page_tlb_size, size_t bucket_size,
Logger* logger, uint32_t threshold_use_skiplist,
size_t huge_page_tlb_size, Logger* logger,
int bucket_entries_logging_threshold, int bucket_entries_logging_threshold,
bool if_log_bucket_dist_when_flash) bool if_log_bucket_dist_when_flash)
: MemTableRep(arena), : MemTableRep(arena),
bucket_size_(bucket_size), bucket_size_(bucket_size),
// Threshold to use skip list doesn't make sense if less than 3, so we
// force it to be minimum of 3 to simplify implementation.
threshold_use_skiplist_(std::max(threshold_use_skiplist, 3U)),
transform_(transform), transform_(transform),
compare_(compare), compare_(compare),
logger_(logger), logger_(logger),
@ -343,23 +487,130 @@ KeyHandle HashLinkListRep::Allocate(const size_t len, char** buf) {
return static_cast<void*>(x); return static_cast<void*>(x);
} }
SkipListBucketHeader* HashLinkListRep::GetSkipListBucketHeader(
Pointer* first_next_pointer) const {
if (first_next_pointer == nullptr) {
return nullptr;
}
if (first_next_pointer->NoBarrier_Load() == nullptr) {
// Single entry bucket
return nullptr;
}
// Counting header
BucketHeader* header = reinterpret_cast<BucketHeader*>(first_next_pointer);
if (header->IsSkipListBucket()) {
assert(header->num_entries > threshold_use_skiplist_);
auto* skip_list_bucket_header =
reinterpret_cast<SkipListBucketHeader*>(header);
assert(skip_list_bucket_header->Counting_header.next.NoBarrier_Load() ==
header);
return skip_list_bucket_header;
}
assert(header->num_entries <= threshold_use_skiplist_);
return nullptr;
}
Node* HashLinkListRep::GetLinkListFirstNode(Pointer* first_next_pointer) const {
if (first_next_pointer == nullptr) {
return nullptr;
}
if (first_next_pointer->NoBarrier_Load() == nullptr) {
// Single entry bucket
return reinterpret_cast<Node*>(first_next_pointer);
}
// Counting header
BucketHeader* header = reinterpret_cast<BucketHeader*>(first_next_pointer);
if (!header->IsSkipListBucket()) {
assert(header->num_entries <= threshold_use_skiplist_);
return reinterpret_cast<Node*>(header->next.NoBarrier_Load());
}
assert(header->num_entries > threshold_use_skiplist_);
return nullptr;
}
void HashLinkListRep::Insert(KeyHandle handle) { void HashLinkListRep::Insert(KeyHandle handle) {
Node* x = static_cast<Node*>(handle); Node* x = static_cast<Node*>(handle);
assert(!Contains(x->key)); assert(!Contains(x->key));
Slice internal_key = GetLengthPrefixedSlice(x->key); Slice internal_key = GetLengthPrefixedSlice(x->key);
auto transformed = GetPrefix(internal_key); auto transformed = GetPrefix(internal_key);
auto& bucket = buckets_[GetHash(transformed)]; auto& bucket = buckets_[GetHash(transformed)];
Node* head = static_cast<Node*>(bucket.Acquire_Load()); Pointer* first_next_pointer = static_cast<Pointer*>(bucket.NoBarrier_Load());
if (!head) { if (first_next_pointer == nullptr) {
// Case 1. empty bucket
// NoBarrier_SetNext() suffices since we will add a barrier when // NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i]. // we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(nullptr); x->NoBarrier_SetNext(nullptr);
bucket.Release_Store(static_cast<void*>(x)); bucket.Release_Store(x);
return;
}
BucketHeader* header = nullptr;
if (first_next_pointer->NoBarrier_Load() == nullptr) {
// Case 2. only one entry in the bucket
// Need to convert to a Counting bucket and turn to case 4.
Node* first = reinterpret_cast<Node*>(first_next_pointer);
// Need to add a bucket header.
// We have to first convert it to a bucket with header before inserting
// the new node. Otherwise, we might need to change next pointer of first.
// In that case, a reader might sees the next pointer is NULL and wrongly
// think the node is a bucket header.
auto* mem = arena_->AllocateAligned(sizeof(BucketHeader));
header = new (mem) BucketHeader(first, 1);
bucket.Release_Store(header);
} else {
header = reinterpret_cast<BucketHeader*>(first_next_pointer);
if (header->IsSkipListBucket()) {
// Case 4. Bucket is already a skip list
assert(header->num_entries > threshold_use_skiplist_);
auto* skip_list_bucket_header =
reinterpret_cast<SkipListBucketHeader*>(header);
skip_list_bucket_header->Counting_header.num_entries++;
skip_list_bucket_header->skip_list.Insert(x->key);
return; return;
} }
}
if (bucket_entries_logging_threshold_ > 0 &&
header->num_entries ==
static_cast<uint32_t>(bucket_entries_logging_threshold_)) {
Info(logger_,
"HashLinkedList bucket %zu has more than %d "
"entries. Key to insert: %s",
GetHash(transformed), header->num_entries,
GetLengthPrefixedSlice(x->key).ToString(true).c_str());
}
Node* cur = head; if (header->num_entries == threshold_use_skiplist_) {
// Case 3. number of entries reaches the threshold so need to convert to
// skip list.
LinkListIterator bucket_iter(
this, reinterpret_cast<Node*>(first_next_pointer->NoBarrier_Load()));
auto mem = arena_->AllocateAligned(sizeof(SkipListBucketHeader));
SkipListBucketHeader* new_skip_list_header = new (mem)
SkipListBucketHeader(compare_, arena_, header->num_entries + 1);
auto& skip_list = new_skip_list_header->skip_list;
// Add all current entries to the skip list
for (bucket_iter.SeekToHead(); bucket_iter.Valid(); bucket_iter.Next()) {
skip_list.Insert(bucket_iter.key());
}
// insert the new entry
skip_list.Insert(x->key);
// Set the bucket
bucket.Release_Store(new_skip_list_header);
} else {
// Case 5. Need to insert to the sorted linked list without changing the
// header.
Node* first = reinterpret_cast<Node*>(header->next.NoBarrier_Load());
assert(first != nullptr);
// Advance counter unless the bucket needs to be advanced to skip list.
// In that case, we need to make sure the previous count never exceeds
// threshold_use_skiplist_ to avoid readers to cast to wrong format.
header->num_entries++;
Node* cur = first;
Node* prev = nullptr; Node* prev = nullptr;
while (true) { while (true) {
if (cur == nullptr) { if (cur == nullptr) {
@ -368,7 +619,7 @@ void HashLinkListRep::Insert(KeyHandle handle) {
Node* next = cur->Next(); Node* next = cur->Next();
// Make sure the lists are sorted. // Make sure the lists are sorted.
// If x points to head_ or next points nullptr, it is trivially satisfied. // If x points to head_ or next points nullptr, it is trivially satisfied.
assert((cur == head) || (next == nullptr) || assert((cur == first) || (next == nullptr) ||
KeyIsAfterNode(next->key, cur)); KeyIsAfterNode(next->key, cur));
if (KeyIsAfterNode(internal_key, cur)) { if (KeyIsAfterNode(internal_key, cur)) {
// Keep searching in this list // Keep searching in this list
@ -389,7 +640,8 @@ void HashLinkListRep::Insert(KeyHandle handle) {
if (prev) { if (prev) {
prev->SetNext(x); prev->SetNext(x);
} else { } else {
bucket.Release_Store(static_cast<void*>(x)); header->next.Release_Store(static_cast<void*>(x));
}
} }
} }
@ -401,7 +653,13 @@ bool HashLinkListRep::Contains(const char* key) const {
if (bucket == nullptr) { if (bucket == nullptr) {
return false; return false;
} }
return BucketContains(bucket, internal_key);
SkipListBucketHeader* skip_list_header = GetSkipListBucketHeader(bucket);
if (skip_list_header != nullptr) {
return skip_list_header->skip_list.Contains(key);
} else {
return LinkListContains(GetLinkListFirstNode(bucket), internal_key);
}
} }
size_t HashLinkListRep::ApproximateMemoryUsage() { size_t HashLinkListRep::ApproximateMemoryUsage() {
@ -413,37 +671,53 @@ void HashLinkListRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) { bool (*callback_func)(void* arg, const char* entry)) {
auto transformed = transform_->Transform(k.user_key()); auto transformed = transform_->Transform(k.user_key());
auto bucket = GetBucket(transformed); auto bucket = GetBucket(transformed);
if (bucket != nullptr) {
Iterator iter(this, bucket); auto* skip_list_header = GetSkipListBucketHeader(bucket);
if (skip_list_header != nullptr) {
// Is a skip list
MemtableSkipList::Iterator iter(&skip_list_header->skip_list);
for (iter.Seek(k.memtable_key().data());
iter.Valid() && callback_func(callback_args, iter.key());
iter.Next()) {
}
} else {
auto* link_list_head = GetLinkListFirstNode(bucket);
if (link_list_head != nullptr) {
LinkListIterator iter(this, link_list_head);
for (iter.Seek(k.internal_key(), nullptr); for (iter.Seek(k.internal_key(), nullptr);
iter.Valid() && callback_func(callback_args, iter.key()); iter.Valid() && callback_func(callback_args, iter.key());
iter.Next()) { iter.Next()) {
} }
} }
} }
}
MemTableRep::Iterator* HashLinkListRep::GetIterator(Arena* alloc_arena) { MemTableRep::Iterator* HashLinkListRep::GetIterator(Arena* alloc_arena) {
// allocate a new arena of similar size to the one currently in use // allocate a new arena of similar size to the one currently in use
Arena* new_arena = new Arena(arena_->BlockSize()); Arena* new_arena = new Arena(arena_->BlockSize());
auto list = new FullList(compare_, new_arena); auto list = new MemtableSkipList(compare_, new_arena);
HistogramImpl keys_per_bucket_hist; HistogramImpl keys_per_bucket_hist;
for (size_t i = 0; i < bucket_size_; ++i) { for (size_t i = 0; i < bucket_size_; ++i) {
int count = 0; int count = 0;
bool num_entries_printed = false; auto* bucket = GetBucket(i);
auto bucket = GetBucket(i);
if (bucket != nullptr) { if (bucket != nullptr) {
Iterator itr(this, bucket); auto* skip_list_header = GetSkipListBucketHeader(bucket);
if (skip_list_header != nullptr) {
// Is a skip list
MemtableSkipList::Iterator itr(&skip_list_header->skip_list);
for (itr.SeekToFirst(); itr.Valid(); itr.Next()) {
list->Insert(itr.key());
count++;
}
} else {
auto* link_list_head = GetLinkListFirstNode(bucket);
if (link_list_head != nullptr) {
LinkListIterator itr(this, link_list_head);
for (itr.SeekToHead(); itr.Valid(); itr.Next()) { for (itr.SeekToHead(); itr.Valid(); itr.Next()) {
list->Insert(itr.key()); list->Insert(itr.key());
if (logger_ != nullptr && count++;
++count >= bucket_entries_logging_threshold_ && }
!num_entries_printed) {
num_entries_printed = true;
Info(logger_, "HashLinkedList bucket %zu has more than %d "
"entries. %dth key: %s",
i, count, count,
GetLengthPrefixedSlice(itr.key()).ToString(true).c_str());
} }
} }
} }
@ -474,7 +748,8 @@ MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator(
} }
} }
bool HashLinkListRep::BucketContains(Node* head, const Slice& user_key) const { bool HashLinkListRep::LinkListContains(Node* head,
const Slice& user_key) const {
Node* x = FindGreaterOrEqualInBucket(head, user_key); Node* x = FindGreaterOrEqualInBucket(head, user_key);
return (x != nullptr && Equal(user_key, x->key)); return (x != nullptr && Equal(user_key, x->key));
} }
@ -505,17 +780,19 @@ Node* HashLinkListRep::FindGreaterOrEqualInBucket(Node* head,
MemTableRep* HashLinkListRepFactory::CreateMemTableRep( MemTableRep* HashLinkListRepFactory::CreateMemTableRep(
const MemTableRep::KeyComparator& compare, Arena* arena, const MemTableRep::KeyComparator& compare, Arena* arena,
const SliceTransform* transform, Logger* logger) { const SliceTransform* transform, Logger* logger) {
return new HashLinkListRep( return new HashLinkListRep(compare, arena, transform, bucket_count_,
compare, arena, transform, bucket_count_, huge_page_tlb_size_, logger, threshold_use_skiplist_, huge_page_tlb_size_,
bucket_entries_logging_threshold_, if_log_bucket_dist_when_flash_); logger, bucket_entries_logging_threshold_,
if_log_bucket_dist_when_flash_);
} }
MemTableRepFactory* NewHashLinkListRepFactory( MemTableRepFactory* NewHashLinkListRepFactory(
size_t bucket_count, size_t huge_page_tlb_size, size_t bucket_count, size_t huge_page_tlb_size,
int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash) { int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash,
return new HashLinkListRepFactory(bucket_count, huge_page_tlb_size, uint32_t threshold_use_skiplist) {
bucket_entries_logging_threshold, return new HashLinkListRepFactory(
if_log_bucket_dist_when_flash); bucket_count, threshold_use_skiplist, huge_page_tlb_size,
bucket_entries_logging_threshold, if_log_bucket_dist_when_flash);
} }
} // namespace rocksdb } // namespace rocksdb

@ -16,10 +16,12 @@ namespace rocksdb {
class HashLinkListRepFactory : public MemTableRepFactory { class HashLinkListRepFactory : public MemTableRepFactory {
public: public:
explicit HashLinkListRepFactory(size_t bucket_count, explicit HashLinkListRepFactory(size_t bucket_count,
uint32_t threshold_use_skiplist,
size_t huge_page_tlb_size, size_t huge_page_tlb_size,
int bucket_entries_logging_threshold, int bucket_entries_logging_threshold,
bool if_log_bucket_dist_when_flash) bool if_log_bucket_dist_when_flash)
: bucket_count_(bucket_count), : bucket_count_(bucket_count),
threshold_use_skiplist_(threshold_use_skiplist),
huge_page_tlb_size_(huge_page_tlb_size), huge_page_tlb_size_(huge_page_tlb_size),
bucket_entries_logging_threshold_(bucket_entries_logging_threshold), bucket_entries_logging_threshold_(bucket_entries_logging_threshold),
if_log_bucket_dist_when_flash_(if_log_bucket_dist_when_flash) {} if_log_bucket_dist_when_flash_(if_log_bucket_dist_when_flash) {}
@ -36,6 +38,7 @@ class HashLinkListRepFactory : public MemTableRepFactory {
private: private:
const size_t bucket_count_; const size_t bucket_count_;
const uint32_t threshold_use_skiplist_;
const size_t huge_page_tlb_size_; const size_t huge_page_tlb_size_;
int bucket_entries_logging_threshold_; int bucket_entries_logging_threshold_;
bool if_log_bucket_dist_when_flash_; bool if_log_bucket_dist_when_flash_;

Loading…
Cancel
Save