[Performance Branch] A Hashed Linked List Based Mem Table

Summary:
Implement a mem table, in which keys are hashed based on prefixes. In each bucket, entries are organized in a sorted linked list. It has the same thread safety guarantee as skip list.

The motivation is to optimize memory usage for the case that prefix hashing is primary way of seeking to the entry. Compared to hash skip list implementation, this implementation is more memory efficient, but inside each bucket, search is always linear. The target scenario is that there are only very limited number of records in each hash bucket.

Test Plan: Add a test case in db_test

Reviewers: haobo, kailiu, dhruba

Reviewed By: haobo

CC: igor, nkg-, leveldb

Differential Revision: https://reviews.facebook.net/D14979
main
Siying Dong 11 years ago
parent 17a222670b
commit 424a524ac9
  1. 7
      db/db_test.cc
  2. 376
      db/prefix_test.cc
  3. 7
      include/rocksdb/memtablerep.h
  4. 462
      util/hash_linklist_rep.cc
  5. 39
      util/hash_linklist_rep.h

@ -29,6 +29,7 @@
#include "util/mutexlock.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "util/hash_linklist_rep.h"
#include "utilities/merge_operators.h"
namespace rocksdb {
@ -250,6 +251,7 @@ class DBTest {
kPlainTableFirstBytePrefix,
kPlainTableAllBytesPrefix,
kVectorRep,
kHashLinkList,
kMergePut,
kFilter,
kUncompressed,
@ -403,6 +405,10 @@ class DBTest {
case kVectorRep:
options.memtable_factory.reset(new VectorRepFactory(100));
break;
case kHashLinkList:
options.memtable_factory.reset(
NewHashLinkListRepFactory(NewFixedPrefixTransform(1), 4));
break;
case kUniversalCompaction:
options.compaction_style = kCompactionStyleUniversal;
break;
@ -4521,6 +4527,7 @@ TEST(DBTest, Randomized) {
int p = rnd.Uniform(100);
int minimum = 0;
if (option_config_ == kHashSkipList ||
option_config_ == kHashLinkList ||
option_config_ == kPlainTableFirstBytePrefix) {
minimum = 1;
}

@ -109,20 +109,6 @@ class PrefixTest {
FLAGS_min_write_buffer_number_to_merge;
options.comparator = new TestKeyComparator();
if (FLAGS_use_prefix_hash_memtable) {
auto prefix_extractor = NewFixedPrefixTransform(8);
options.prefix_extractor = prefix_extractor;
if (FLAGS_use_nolock_version) {
options.memtable_factory.reset(NewHashSkipListRepFactory(
prefix_extractor, FLAGS_bucket_count,
FLAGS_skiplist_height));
} else {
options.memtable_factory =
std::make_shared<rocksdb::PrefixHashRepFactory>(
prefix_extractor, FLAGS_bucket_count, FLAGS_num_locks);
}
}
options.memtable_prefix_bloom_bits = FLAGS_memtable_prefix_bloom_bits;
options.memtable_prefix_bloom_probes = FLAGS_memtable_prefix_bloom_probes;
@ -130,216 +116,256 @@ class PrefixTest {
ASSERT_OK(s);
return std::shared_ptr<DB>(db);
}
bool NextOptions() {
// skip some options
option_config_++;
if (option_config_ < kEnd) {
auto prefix_extractor = NewFixedPrefixTransform(8);
options.prefix_extractor = prefix_extractor;
switch(option_config_) {
case kHashSkipList:
options.memtable_factory.reset(
NewHashSkipListRepFactory(options.prefix_extractor,
FLAGS_bucket_count,
FLAGS_skiplist_height));
return true;
case kHashLinkList:
options.memtable_factory.reset(
NewHashLinkListRepFactory(options.prefix_extractor,
FLAGS_bucket_count));
return true;
default:
return false;
}
}
return false;
}
PrefixTest() : option_config_(kBegin) { }
~PrefixTest() {
delete options.comparator;
}
protected:
enum OptionConfig {
kBegin,
kHashSkipList,
kHashLinkList,
kEnd
};
int option_config_;
Options options;
};
TEST(PrefixTest, DynamicPrefixIterator) {
while (NextOptions()) {
std::cout << "*** Mem table: " << options.memtable_factory->Name()
<< std::endl;
DestroyDB(kDbName, Options());
auto db = OpenDb();
WriteOptions write_options;
ReadOptions read_options;
std::vector<uint64_t> prefixes;
for (uint64_t i = 0; i < FLAGS_total_prefixes; ++i) {
prefixes.push_back(i);
}
DestroyDB(kDbName, Options());
auto db = OpenDb();
WriteOptions write_options;
ReadOptions read_options;
if (FLAGS_random_prefix) {
std::random_shuffle(prefixes.begin(), prefixes.end());
}
std::vector<uint64_t> prefixes;
for (uint64_t i = 0; i < FLAGS_total_prefixes; ++i) {
prefixes.push_back(i);
}
HistogramImpl hist_put_time;
HistogramImpl hist_put_comparison;
if (FLAGS_random_prefix) {
std::random_shuffle(prefixes.begin(), prefixes.end());
}
// insert x random prefix, each with y continuous element.
for (auto prefix : prefixes) {
for (uint64_t sorted = 0; sorted < FLAGS_items_per_prefix; sorted++) {
TestKey test_key(prefix, sorted);
Slice key = TestKeyToSlice(test_key);
std::string value(FLAGS_value_size, 0);
perf_context.Reset();
StopWatchNano timer(Env::Default(), true);
ASSERT_OK(db->Put(write_options, key, value));
hist_put_time.Add(timer.ElapsedNanos());
hist_put_comparison.Add(perf_context.user_key_comparison_count);
}
}
std::cout << "Put key comparison: \n" << hist_put_comparison.ToString()
<< "Put time: \n" << hist_put_time.ToString();
HistogramImpl hist_put_time;
HistogramImpl hist_put_comparison;
// test seek existing keys
HistogramImpl hist_seek_time;
HistogramImpl hist_seek_comparison;
// insert x random prefix, each with y continuous element.
for (auto prefix : prefixes) {
for (uint64_t sorted = 0; sorted < FLAGS_items_per_prefix; sorted++) {
TestKey test_key(prefix, sorted);
if (FLAGS_use_prefix_hash_memtable) {
read_options.prefix_seek = true;
}
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
for (auto prefix : prefixes) {
TestKey test_key(prefix, FLAGS_items_per_prefix / 2);
Slice key = TestKeyToSlice(test_key);
std::string value(FLAGS_value_size, 0);
std::string value = "v" + std::to_string(0);
perf_context.Reset();
StopWatchNano timer(Env::Default(), true);
ASSERT_OK(db->Put(write_options, key, value));
hist_put_time.Add(timer.ElapsedNanos());
hist_put_comparison.Add(perf_context.user_key_comparison_count);
uint64_t total_keys = 0;
for (iter->Seek(key); iter->Valid(); iter->Next()) {
if (FLAGS_trigger_deadlock) {
std::cout << "Behold the deadlock!\n";
db->Delete(write_options, iter->key());
}
auto test_key = SliceToTestKey(iter->key());
if (test_key->prefix != prefix) break;
total_keys++;
}
hist_seek_time.Add(timer.ElapsedNanos());
hist_seek_comparison.Add(perf_context.user_key_comparison_count);
ASSERT_EQ(total_keys, FLAGS_items_per_prefix - FLAGS_items_per_prefix/2);
}
}
std::cout << "Put key comparison: \n" << hist_put_comparison.ToString()
<< "Put time: \n" << hist_put_time.ToString();
std::cout << "Seek key comparison: \n"
<< hist_seek_comparison.ToString()
<< "Seek time: \n"
<< hist_seek_time.ToString();
// test seek existing keys
HistogramImpl hist_seek_time;
HistogramImpl hist_seek_comparison;
// test non-existing keys
HistogramImpl hist_no_seek_time;
HistogramImpl hist_no_seek_comparison;
if (FLAGS_use_prefix_hash_memtable) {
read_options.prefix_seek = true;
}
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
for (auto prefix : prefixes) {
TestKey test_key(prefix, FLAGS_items_per_prefix / 2);
Slice key = TestKeyToSlice(test_key);
std::string value = "v" + std::to_string(0);
perf_context.Reset();
StopWatchNano timer(Env::Default(), true);
uint64_t total_keys = 0;
for (iter->Seek(key); iter->Valid(); iter->Next()) {
if (FLAGS_trigger_deadlock) {
std::cout << "Behold the deadlock!\n";
db->Delete(write_options, iter->key());
}
auto test_key = SliceToTestKey(iter->key());
if (test_key->prefix != prefix) break;
total_keys++;
for (auto prefix = FLAGS_total_prefixes;
prefix < FLAGS_total_prefixes + 10000;
prefix++) {
TestKey test_key(prefix, 0);
Slice key = TestKeyToSlice(test_key);
perf_context.Reset();
StopWatchNano timer(Env::Default(), true);
iter->Seek(key);
hist_no_seek_time.Add(timer.ElapsedNanos());
hist_no_seek_comparison.Add(perf_context.user_key_comparison_count);
ASSERT_TRUE(!iter->Valid());
}
hist_seek_time.Add(timer.ElapsedNanos());
hist_seek_comparison.Add(perf_context.user_key_comparison_count);
ASSERT_EQ(total_keys, FLAGS_items_per_prefix - FLAGS_items_per_prefix/2);
}
std::cout << "Seek key comparison: \n"
<< hist_seek_comparison.ToString()
<< "Seek time: \n"
<< hist_seek_time.ToString();
// test non-existing keys
HistogramImpl hist_no_seek_time;
HistogramImpl hist_no_seek_comparison;
for (auto prefix = FLAGS_total_prefixes;
prefix < FLAGS_total_prefixes + 10000;
prefix++) {
TestKey test_key(prefix, 0);
Slice key = TestKeyToSlice(test_key);
perf_context.Reset();
StopWatchNano timer(Env::Default(), true);
iter->Seek(key);
hist_no_seek_time.Add(timer.ElapsedNanos());
hist_no_seek_comparison.Add(perf_context.user_key_comparison_count);
ASSERT_TRUE(!iter->Valid());
std::cout << "non-existing Seek key comparison: \n"
<< hist_no_seek_comparison.ToString()
<< "non-existing Seek time: \n"
<< hist_no_seek_time.ToString();
}
std::cout << "non-existing Seek key comparison: \n"
<< hist_no_seek_comparison.ToString()
<< "non-existing Seek time: \n"
<< hist_no_seek_time.ToString();
}
TEST(PrefixTest, PrefixHash) {
while (NextOptions()) {
std::cout << "*** Mem table: " << options.memtable_factory->Name()
<< std::endl;
DestroyDB(kDbName, Options());
auto db = OpenDb();
WriteOptions write_options;
ReadOptions read_options;
std::vector<uint64_t> prefixes;
for (uint64_t i = 0; i < FLAGS_total_prefixes; ++i) {
prefixes.push_back(i);
}
DestroyDB(kDbName, Options());
auto db = OpenDb();
WriteOptions write_options;
ReadOptions read_options;
std::vector<uint64_t> prefixes;
for (uint64_t i = 0; i < FLAGS_total_prefixes; ++i) {
prefixes.push_back(i);
}
if (FLAGS_random_prefix) {
std::random_shuffle(prefixes.begin(), prefixes.end());
}
if (FLAGS_random_prefix) {
std::random_shuffle(prefixes.begin(), prefixes.end());
}
// insert x random prefix, each with y continuous element.
HistogramImpl hist_put_time;
HistogramImpl hist_put_comparison;
// insert x random prefix, each with y continuous element.
HistogramImpl hist_put_time;
HistogramImpl hist_put_comparison;
for (auto prefix : prefixes) {
for (uint64_t sorted = 0; sorted < FLAGS_items_per_prefix; sorted++) {
TestKey test_key(prefix, sorted);
for (auto prefix : prefixes) {
for (uint64_t sorted = 0; sorted < FLAGS_items_per_prefix; sorted++) {
TestKey test_key(prefix, sorted);
Slice key = TestKeyToSlice(test_key);
std::string value = "v" + std::to_string(sorted);
Slice key = TestKeyToSlice(test_key);
std::string value = "v" + std::to_string(sorted);
perf_context.Reset();
StopWatchNano timer(Env::Default(), true);
ASSERT_OK(db->Put(write_options, key, value));
hist_put_time.Add(timer.ElapsedNanos());
hist_put_comparison.Add(perf_context.user_key_comparison_count);
perf_context.Reset();
StopWatchNano timer(Env::Default(), true);
ASSERT_OK(db->Put(write_options, key, value));
hist_put_time.Add(timer.ElapsedNanos());
hist_put_comparison.Add(perf_context.user_key_comparison_count);
}
}
}
std::cout << "Put key comparison: \n" << hist_put_comparison.ToString()
<< "Put time: \n" << hist_put_time.ToString();
std::cout << "Put key comparison: \n" << hist_put_comparison.ToString()
<< "Put time: \n" << hist_put_time.ToString();
// test seek existing keys
HistogramImpl hist_seek_time;
HistogramImpl hist_seek_comparison;
// test seek existing keys
HistogramImpl hist_seek_time;
HistogramImpl hist_seek_comparison;
for (auto prefix : prefixes) {
TestKey test_key(prefix, 0);
Slice key = TestKeyToSlice(test_key);
std::string value = "v" + std::to_string(0);
for (auto prefix : prefixes) {
TestKey test_key(prefix, 0);
Slice key = TestKeyToSlice(test_key);
std::string value = "v" + std::to_string(0);
Slice key_prefix;
if (FLAGS_use_prefix_hash_memtable) {
key_prefix = options.prefix_extractor->Transform(key);
read_options.prefix = &key_prefix;
}
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
Slice key_prefix;
if (FLAGS_use_prefix_hash_memtable) {
key_prefix = options.prefix_extractor->Transform(key);
read_options.prefix = &key_prefix;
}
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
perf_context.Reset();
StopWatchNano timer(Env::Default(), true);
uint64_t total_keys = 0;
for (iter->Seek(key); iter->Valid(); iter->Next()) {
if (FLAGS_trigger_deadlock) {
std::cout << "Behold the deadlock!\n";
db->Delete(write_options, iter->key());
perf_context.Reset();
StopWatchNano timer(Env::Default(), true);
uint64_t total_keys = 0;
for (iter->Seek(key); iter->Valid(); iter->Next()) {
if (FLAGS_trigger_deadlock) {
std::cout << "Behold the deadlock!\n";
db->Delete(write_options, iter->key());
}
auto test_key = SliceToTestKey(iter->key());
if (test_key->prefix != prefix) break;
total_keys++;
}
auto test_key = SliceToTestKey(iter->key());
if (test_key->prefix != prefix) break;
total_keys++;
hist_seek_time.Add(timer.ElapsedNanos());
hist_seek_comparison.Add(perf_context.user_key_comparison_count);
ASSERT_EQ(total_keys, FLAGS_items_per_prefix);
}
hist_seek_time.Add(timer.ElapsedNanos());
hist_seek_comparison.Add(perf_context.user_key_comparison_count);
ASSERT_EQ(total_keys, FLAGS_items_per_prefix);
}
std::cout << "Seek key comparison: \n"
<< hist_seek_comparison.ToString()
<< "Seek time: \n"
<< hist_seek_time.ToString();
std::cout << "Seek key comparison: \n"
<< hist_seek_comparison.ToString()
<< "Seek time: \n"
<< hist_seek_time.ToString();
// test non-existing keys
HistogramImpl hist_no_seek_time;
HistogramImpl hist_no_seek_comparison;
// test non-existing keys
HistogramImpl hist_no_seek_time;
HistogramImpl hist_no_seek_comparison;
for (auto prefix = FLAGS_total_prefixes;
prefix < FLAGS_total_prefixes + 100;
prefix++) {
TestKey test_key(prefix, 0);
Slice key = TestKeyToSlice(test_key);
for (auto prefix = FLAGS_total_prefixes;
prefix < FLAGS_total_prefixes + 100;
prefix++) {
TestKey test_key(prefix, 0);
Slice key = TestKeyToSlice(test_key);
if (FLAGS_use_prefix_hash_memtable) {
Slice key_prefix = options.prefix_extractor->Transform(key);
read_options.prefix = &key_prefix;
if (FLAGS_use_prefix_hash_memtable) {
Slice key_prefix = options.prefix_extractor->Transform(key);
read_options.prefix = &key_prefix;
}
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
perf_context.Reset();
StopWatchNano timer(Env::Default(), true);
iter->Seek(key);
hist_no_seek_time.Add(timer.ElapsedNanos());
hist_no_seek_comparison.Add(perf_context.user_key_comparison_count);
ASSERT_TRUE(!iter->Valid());
}
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
perf_context.Reset();
StopWatchNano timer(Env::Default(), true);
iter->Seek(key);
hist_no_seek_time.Add(timer.ElapsedNanos());
hist_no_seek_comparison.Add(perf_context.user_key_comparison_count);
ASSERT_TRUE(!iter->Valid());
std::cout << "non-existing Seek key comparison: \n"
<< hist_no_seek_comparison.ToString()
<< "non-existing Seek time: \n"
<< hist_no_seek_time.ToString();
}
std::cout << "non-existing Seek key comparison: \n"
<< hist_no_seek_comparison.ToString()
<< "non-existing Seek time: \n"
<< hist_no_seek_time.ToString();
}
}

@ -268,6 +268,13 @@ extern MemTableRepFactory* NewHashSkipListRepFactory(
int32_t skiplist_height = 4, int32_t skiplist_branching_factor = 4
);
// The factory is to create memtables with a hashed linked list:
// it contains a fixed array of buckets, each pointing to a sorted single
// linked list (null if the bucket is empty).
// bucket_count: number of fixed array buckets
extern MemTableRepFactory* NewHashLinkListRepFactory(
const SliceTransform* transform, size_t bucket_count = 50000);
}
#endif // STORAGE_ROCKSDB_DB_MEMTABLEREP_H_

@ -0,0 +1,462 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#include "util/hash_linklist_rep.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/arena.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "port/port.h"
#include "port/atomic_pointer.h"
#include "util/murmurhash.h"
#include "db/memtable.h"
#include "db/skiplist.h"
namespace rocksdb {
namespace {
typedef const char* Key;
struct Node {
explicit Node(const Key& k) :
key(k) {
}
Key const key;
// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
Node* Next() {
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return reinterpret_cast<Node*>(next_.Acquire_Load());
}
void SetNext(Node* x) {
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_.Release_Store(x);
}
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next() {
return reinterpret_cast<Node*>(next_.NoBarrier_Load());
}
void NoBarrier_SetNext(Node* x) {
next_.NoBarrier_Store(x);
}
private:
port::AtomicPointer next_;
};
class HashLinkListRep : public MemTableRep {
public:
HashLinkListRep(MemTableRep::KeyComparator& compare, Arena* arena,
const SliceTransform* transform, size_t bucket_size);
virtual void Insert(const char* key) override;
virtual bool Contains(const char* key) const override;
virtual size_t ApproximateMemoryUsage() override;
virtual ~HashLinkListRep();
virtual std::shared_ptr<MemTableRep::Iterator> GetIterator() override;
virtual std::shared_ptr<MemTableRep::Iterator> GetIterator(
const Slice& slice) override;
virtual std::shared_ptr<MemTableRep::Iterator> GetPrefixIterator(
const Slice& prefix) override;
virtual std::shared_ptr<MemTableRep::Iterator> GetDynamicPrefixIterator()
override;
private:
friend class DynamicIterator;
typedef SkipList<const char*, MemTableRep::KeyComparator&> FullList;
size_t bucket_size_;
// Maps slices (which are transformed user keys) to buckets of keys sharing
// the same transform.
port::AtomicPointer* buckets_;
// The user-supplied transform whose domain is the user keys.
const SliceTransform* transform_;
MemTableRep::KeyComparator& compare_;
// immutable after construction
Arena* const arena_;
bool BucketContains(Node* head, const Key& key) const;
size_t GetHash(const Slice& slice) const {
return MurmurHash(slice.data(), slice.size(), 0) % bucket_size_;
}
Node* GetBucket(size_t i) const {
return static_cast<Node*>(buckets_[i].Acquire_Load());
}
Node* GetBucket(const Slice& slice) const {
return GetBucket(GetHash(slice));
}
Node* NewNode(const Key& key) {
char* mem = arena_->AllocateAligned(sizeof(Node));
return new (mem) Node(key);
}
bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }
bool KeyIsAfterNode(const Key& key, const Node* n) const {
// nullptr n is considered infinite
return (n != nullptr) && (compare_(n->key, key) < 0);
}
Node* FindGreaterOrEqualInBucket(Node* head, const Key& key) const;
class FullListIterator : public MemTableRep::Iterator {
public:
explicit FullListIterator(FullList* list)
: iter_(list) {}
virtual ~FullListIterator() {
}
// Returns true iff the iterator is positioned at a valid node.
virtual bool Valid() const {
return iter_.Valid();
}
// Returns the key at the current position.
// REQUIRES: Valid()
virtual const char* key() const {
assert(Valid());
return iter_.key();
}
// Advances to the next position.
// REQUIRES: Valid()
virtual void Next() {
assert(Valid());
iter_.Next();
}
// Advances to the previous position.
// REQUIRES: Valid()
virtual void Prev() {
assert(Valid());
iter_.Prev();
}
// Advance to the first entry with a key >= target
virtual void Seek(const Slice& internal_key, const char* memtable_key) {
const char* encoded_key =
(memtable_key != nullptr) ?
memtable_key : EncodeKey(&tmp_, internal_key);
iter_.Seek(encoded_key);
}
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToFirst() {
iter_.SeekToFirst();
}
// Position at the last entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToLast() {
iter_.SeekToLast();
}
private:
FullList::Iterator iter_;
std::string tmp_; // For passing to EncodeKey
};
class Iterator : public MemTableRep::Iterator {
public:
explicit Iterator(const HashLinkListRep* const hash_link_list_rep,
Node* head) :
hash_link_list_rep_(hash_link_list_rep), head_(head), node_(nullptr) {
}
virtual ~Iterator() {
}
// Returns true iff the iterator is positioned at a valid node.
virtual bool Valid() const {
return node_ != nullptr;
}
// Returns the key at the current position.
// REQUIRES: Valid()
virtual const char* key() const {
assert(Valid());
return node_->key;
}
// Advances to the next position.
// REQUIRES: Valid()
virtual void Next() {
assert(Valid());
node_ = node_->Next();
}
// Advances to the previous position.
// REQUIRES: Valid()
virtual void Prev() {
// Prefix iterator does not support total order.
// We simply set the iterator to invalid state
Reset(nullptr);
}
// Advance to the first entry with a key >= target
virtual void Seek(const Slice& internal_key, const char* memtable_key) {
const char* encoded_key =
(memtable_key != nullptr) ?
memtable_key : EncodeKey(&tmp_, internal_key);
node_ = hash_link_list_rep_->FindGreaterOrEqualInBucket(head_,
encoded_key);
}
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToFirst() {
// Prefix iterator does not support total order.
// We simply set the iterator to invalid state
Reset(nullptr);
}
// Position at the last entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToLast() {
// Prefix iterator does not support total order.
// We simply set the iterator to invalid state
Reset(nullptr);
}
protected:
void Reset(Node* head) {
head_ = head;
node_ = nullptr;
}
private:
friend class HashLinkListRep;
const HashLinkListRep* const hash_link_list_rep_;
Node* head_;
Node* node_;
std::string tmp_; // For passing to EncodeKey
virtual void SeekToHead() {
node_ = head_;
}
};
class DynamicIterator : public HashLinkListRep::Iterator {
public:
explicit DynamicIterator(HashLinkListRep& memtable_rep)
: HashLinkListRep::Iterator(&memtable_rep, nullptr),
memtable_rep_(memtable_rep) {}
// Advance to the first entry with a key >= target
virtual void Seek(const Slice& k, const char* memtable_key) {
auto transformed = memtable_rep_.transform_->Transform(k);
Reset(memtable_rep_.GetBucket(transformed));
HashLinkListRep::Iterator::Seek(k, memtable_key);
}
private:
// the underlying memtable
const HashLinkListRep& memtable_rep_;
};
class EmptyIterator : public MemTableRep::Iterator {
// This is used when there wasn't a bucket. It is cheaper than
// instantiating an empty bucket over which to iterate.
public:
EmptyIterator() { }
virtual bool Valid() const {
return false;
}
virtual const char* key() const {
assert(false);
return nullptr;
}
virtual void Next() { }
virtual void Prev() { }
virtual void Seek(const Slice& user_key, const char* memtable_key) { }
virtual void SeekToFirst() { }
virtual void SeekToLast() { }
private:
};
std::shared_ptr<EmptyIterator> empty_iterator_;
};
HashLinkListRep::HashLinkListRep(MemTableRep::KeyComparator& compare,
Arena* arena, const SliceTransform* transform,
size_t bucket_size)
: bucket_size_(bucket_size),
transform_(transform),
compare_(compare),
arena_(arena),
empty_iterator_(std::make_shared<EmptyIterator>()) {
char* mem = arena_->AllocateAligned(
sizeof(port::AtomicPointer) * bucket_size);
buckets_ = new (mem) port::AtomicPointer[bucket_size];
for (size_t i = 0; i < bucket_size_; ++i) {
buckets_[i].NoBarrier_Store(nullptr);
}
}
HashLinkListRep::~HashLinkListRep() {
}
void HashLinkListRep::Insert(const char* key) {
assert(!Contains(key));
auto transformed = transform_->Transform(UserKey(key));
auto& bucket = buckets_[GetHash(transformed)];
Node* head = static_cast<Node*>(bucket.Acquire_Load());
if (!head) {
Node* x = NewNode(key);
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(nullptr);
bucket.Release_Store(static_cast<void*>(x));
return;
}
Node* cur = head;
Node* prev = nullptr;
while (true) {
if (cur == nullptr) {
break;
}
Node* next = cur->Next();
// Make sure the lists are sorted.
// If x points to head_ or next points nullptr, it is trivially satisfied.
assert((cur == head) || (next == nullptr) ||
KeyIsAfterNode(next->key, cur));
if (KeyIsAfterNode(key, cur)) {
// Keep searching in this list
prev = cur;
cur = next;
} else {
break;
}
}
// Our data structure does not allow duplicate insertion
assert(cur == nullptr || !Equal(key, cur->key));
Node* x = NewNode(key);
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(cur);
if (prev) {
prev->SetNext(x);
} else {
bucket.Release_Store(static_cast<void*>(x));
}
}
bool HashLinkListRep::Contains(const char* key) const {
auto transformed = transform_->Transform(UserKey(key));
auto bucket = GetBucket(transformed);
if (bucket == nullptr) {
return false;
}
return BucketContains(bucket, key);
}
size_t HashLinkListRep::ApproximateMemoryUsage() {
// Memory is always allocated from the arena.
return 0;
}
std::shared_ptr<MemTableRep::Iterator> HashLinkListRep::GetIterator() {
auto list = new FullList(compare_, arena_);
for (size_t i = 0; i < bucket_size_; ++i) {
auto bucket = GetBucket(i);
if (bucket != nullptr) {
Iterator itr(this, bucket);
for (itr.SeekToHead(); itr.Valid(); itr.Next()) {
list->Insert(itr.key());
}
}
}
return std::make_shared<FullListIterator>(list);
}
std::shared_ptr<MemTableRep::Iterator> HashLinkListRep::GetPrefixIterator(
const Slice& prefix) {
auto bucket = GetBucket(prefix);
if (bucket == nullptr) {
return empty_iterator_;
}
return std::make_shared<Iterator>(this, bucket);
}
std::shared_ptr<MemTableRep::Iterator> HashLinkListRep::GetIterator(
const Slice& slice) {
return GetPrefixIterator(transform_->Transform(slice));
}
std::shared_ptr<MemTableRep::Iterator>
HashLinkListRep::GetDynamicPrefixIterator() {
return std::make_shared<DynamicIterator>(*this);
}
bool HashLinkListRep::BucketContains(Node* head, const Key& key) const {
Node* x = FindGreaterOrEqualInBucket(head, key);
return (x != nullptr && Equal(key, x->key));
}
Node* HashLinkListRep::FindGreaterOrEqualInBucket(Node* head,
const Key& key) const {
Node* x = head;
while (true) {
if (x == nullptr) {
return x;
}
Node* next = x->Next();
// Make sure the lists are sorted.
// If x points to head_ or next points nullptr, it is trivially satisfied.
assert((x == head) || (next == nullptr) || KeyIsAfterNode(next->key, x));
if (KeyIsAfterNode(key, x)) {
// Keep searching in this list
x = next;
} else {
break;
}
}
return x;
}
} // anon namespace
std::shared_ptr<MemTableRep> HashLinkListRepFactory::CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) {
return std::make_shared<HashLinkListRep>(compare, arena, transform_,
bucket_count_);
}
MemTableRepFactory* NewHashLinkListRepFactory(
const SliceTransform* transform, size_t bucket_count) {
return new HashLinkListRepFactory(transform, bucket_count);
}
} // namespace rocksdb

@ -0,0 +1,39 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "rocksdb/slice_transform.h"
#include "rocksdb/memtablerep.h"
namespace rocksdb {
class HashLinkListRepFactory : public MemTableRepFactory {
public:
explicit HashLinkListRepFactory(
const SliceTransform* transform,
size_t bucket_count)
: transform_(transform),
bucket_count_(bucket_count) { }
virtual ~HashLinkListRepFactory() { delete transform_; }
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) override;
virtual const char* Name() const override {
return "HashLinkListRepFactory";
}
const SliceTransform* GetTransform() { return transform_; }
private:
const SliceTransform* transform_;
const size_t bucket_count_;
};
}
Loading…
Cancel
Save