diff --git a/db/db_test.cc b/db/db_test.cc index 5b21b2f1b..8e2bc9f27 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -29,6 +29,7 @@ #include "util/mutexlock.h" #include "util/testharness.h" #include "util/testutil.h" +#include "util/hash_linklist_rep.h" #include "utilities/merge_operators.h" namespace rocksdb { @@ -250,6 +251,7 @@ class DBTest { kPlainTableFirstBytePrefix, kPlainTableAllBytesPrefix, kVectorRep, + kHashLinkList, kMergePut, kFilter, kUncompressed, @@ -403,6 +405,10 @@ class DBTest { case kVectorRep: options.memtable_factory.reset(new VectorRepFactory(100)); break; + case kHashLinkList: + options.memtable_factory.reset( + NewHashLinkListRepFactory(NewFixedPrefixTransform(1), 4)); + break; case kUniversalCompaction: options.compaction_style = kCompactionStyleUniversal; break; @@ -4521,6 +4527,7 @@ TEST(DBTest, Randomized) { int p = rnd.Uniform(100); int minimum = 0; if (option_config_ == kHashSkipList || + option_config_ == kHashLinkList || option_config_ == kPlainTableFirstBytePrefix) { minimum = 1; } diff --git a/db/prefix_test.cc b/db/prefix_test.cc index f66091d11..66cef92cb 100644 --- a/db/prefix_test.cc +++ b/db/prefix_test.cc @@ -109,20 +109,6 @@ class PrefixTest { FLAGS_min_write_buffer_number_to_merge; options.comparator = new TestKeyComparator(); - if (FLAGS_use_prefix_hash_memtable) { - auto prefix_extractor = NewFixedPrefixTransform(8); - options.prefix_extractor = prefix_extractor; - if (FLAGS_use_nolock_version) { - options.memtable_factory.reset(NewHashSkipListRepFactory( - prefix_extractor, FLAGS_bucket_count, - FLAGS_skiplist_height)); - } else { - options.memtable_factory = - std::make_shared( - prefix_extractor, FLAGS_bucket_count, FLAGS_num_locks); - } - } - options.memtable_prefix_bloom_bits = FLAGS_memtable_prefix_bloom_bits; options.memtable_prefix_bloom_probes = FLAGS_memtable_prefix_bloom_probes; @@ -130,216 +116,256 @@ class PrefixTest { ASSERT_OK(s); return std::shared_ptr(db); } + + bool NextOptions() { + // skip some options + option_config_++; + if (option_config_ < kEnd) { + auto prefix_extractor = NewFixedPrefixTransform(8); + options.prefix_extractor = prefix_extractor; + switch(option_config_) { + case kHashSkipList: + options.memtable_factory.reset( + NewHashSkipListRepFactory(options.prefix_extractor, + FLAGS_bucket_count, + FLAGS_skiplist_height)); + return true; + case kHashLinkList: + options.memtable_factory.reset( + NewHashLinkListRepFactory(options.prefix_extractor, + FLAGS_bucket_count)); + return true; + default: + return false; + } + } + return false; + } + + PrefixTest() : option_config_(kBegin) { } ~PrefixTest() { delete options.comparator; } protected: + enum OptionConfig { + kBegin, + kHashSkipList, + kHashLinkList, + kEnd + }; + int option_config_; Options options; }; TEST(PrefixTest, DynamicPrefixIterator) { + while (NextOptions()) { + std::cout << "*** Mem table: " << options.memtable_factory->Name() + << std::endl; + DestroyDB(kDbName, Options()); + auto db = OpenDb(); + WriteOptions write_options; + ReadOptions read_options; + + std::vector prefixes; + for (uint64_t i = 0; i < FLAGS_total_prefixes; ++i) { + prefixes.push_back(i); + } - DestroyDB(kDbName, Options()); - auto db = OpenDb(); - WriteOptions write_options; - ReadOptions read_options; + if (FLAGS_random_prefix) { + std::random_shuffle(prefixes.begin(), prefixes.end()); + } - std::vector prefixes; - for (uint64_t i = 0; i < FLAGS_total_prefixes; ++i) { - prefixes.push_back(i); - } + HistogramImpl hist_put_time; + HistogramImpl hist_put_comparison; - if (FLAGS_random_prefix) { - std::random_shuffle(prefixes.begin(), prefixes.end()); - } + // insert x random prefix, each with y continuous element. + for (auto prefix : prefixes) { + for (uint64_t sorted = 0; sorted < FLAGS_items_per_prefix; sorted++) { + TestKey test_key(prefix, sorted); + + Slice key = TestKeyToSlice(test_key); + std::string value(FLAGS_value_size, 0); + + perf_context.Reset(); + StopWatchNano timer(Env::Default(), true); + ASSERT_OK(db->Put(write_options, key, value)); + hist_put_time.Add(timer.ElapsedNanos()); + hist_put_comparison.Add(perf_context.user_key_comparison_count); + } + } + + std::cout << "Put key comparison: \n" << hist_put_comparison.ToString() + << "Put time: \n" << hist_put_time.ToString(); - HistogramImpl hist_put_time; - HistogramImpl hist_put_comparison; + // test seek existing keys + HistogramImpl hist_seek_time; + HistogramImpl hist_seek_comparison; - // insert x random prefix, each with y continuous element. - for (auto prefix : prefixes) { - for (uint64_t sorted = 0; sorted < FLAGS_items_per_prefix; sorted++) { - TestKey test_key(prefix, sorted); + if (FLAGS_use_prefix_hash_memtable) { + read_options.prefix_seek = true; + } + std::unique_ptr iter(db->NewIterator(read_options)); + for (auto prefix : prefixes) { + TestKey test_key(prefix, FLAGS_items_per_prefix / 2); Slice key = TestKeyToSlice(test_key); - std::string value(FLAGS_value_size, 0); + std::string value = "v" + std::to_string(0); perf_context.Reset(); StopWatchNano timer(Env::Default(), true); - ASSERT_OK(db->Put(write_options, key, value)); - hist_put_time.Add(timer.ElapsedNanos()); - hist_put_comparison.Add(perf_context.user_key_comparison_count); + uint64_t total_keys = 0; + for (iter->Seek(key); iter->Valid(); iter->Next()) { + if (FLAGS_trigger_deadlock) { + std::cout << "Behold the deadlock!\n"; + db->Delete(write_options, iter->key()); + } + auto test_key = SliceToTestKey(iter->key()); + if (test_key->prefix != prefix) break; + total_keys++; + } + hist_seek_time.Add(timer.ElapsedNanos()); + hist_seek_comparison.Add(perf_context.user_key_comparison_count); + ASSERT_EQ(total_keys, FLAGS_items_per_prefix - FLAGS_items_per_prefix/2); } - } - std::cout << "Put key comparison: \n" << hist_put_comparison.ToString() - << "Put time: \n" << hist_put_time.ToString(); + std::cout << "Seek key comparison: \n" + << hist_seek_comparison.ToString() + << "Seek time: \n" + << hist_seek_time.ToString(); - // test seek existing keys - HistogramImpl hist_seek_time; - HistogramImpl hist_seek_comparison; + // test non-existing keys + HistogramImpl hist_no_seek_time; + HistogramImpl hist_no_seek_comparison; - if (FLAGS_use_prefix_hash_memtable) { - read_options.prefix_seek = true; - } - std::unique_ptr iter(db->NewIterator(read_options)); - - for (auto prefix : prefixes) { - TestKey test_key(prefix, FLAGS_items_per_prefix / 2); - Slice key = TestKeyToSlice(test_key); - std::string value = "v" + std::to_string(0); - - perf_context.Reset(); - StopWatchNano timer(Env::Default(), true); - uint64_t total_keys = 0; - for (iter->Seek(key); iter->Valid(); iter->Next()) { - if (FLAGS_trigger_deadlock) { - std::cout << "Behold the deadlock!\n"; - db->Delete(write_options, iter->key()); - } - auto test_key = SliceToTestKey(iter->key()); - if (test_key->prefix != prefix) break; - total_keys++; + for (auto prefix = FLAGS_total_prefixes; + prefix < FLAGS_total_prefixes + 10000; + prefix++) { + TestKey test_key(prefix, 0); + Slice key = TestKeyToSlice(test_key); + + perf_context.Reset(); + StopWatchNano timer(Env::Default(), true); + iter->Seek(key); + hist_no_seek_time.Add(timer.ElapsedNanos()); + hist_no_seek_comparison.Add(perf_context.user_key_comparison_count); + ASSERT_TRUE(!iter->Valid()); } - hist_seek_time.Add(timer.ElapsedNanos()); - hist_seek_comparison.Add(perf_context.user_key_comparison_count); - ASSERT_EQ(total_keys, FLAGS_items_per_prefix - FLAGS_items_per_prefix/2); - } - std::cout << "Seek key comparison: \n" - << hist_seek_comparison.ToString() - << "Seek time: \n" - << hist_seek_time.ToString(); - - // test non-existing keys - HistogramImpl hist_no_seek_time; - HistogramImpl hist_no_seek_comparison; - - for (auto prefix = FLAGS_total_prefixes; - prefix < FLAGS_total_prefixes + 10000; - prefix++) { - TestKey test_key(prefix, 0); - Slice key = TestKeyToSlice(test_key); - - perf_context.Reset(); - StopWatchNano timer(Env::Default(), true); - iter->Seek(key); - hist_no_seek_time.Add(timer.ElapsedNanos()); - hist_no_seek_comparison.Add(perf_context.user_key_comparison_count); - ASSERT_TRUE(!iter->Valid()); + std::cout << "non-existing Seek key comparison: \n" + << hist_no_seek_comparison.ToString() + << "non-existing Seek time: \n" + << hist_no_seek_time.ToString(); } - - std::cout << "non-existing Seek key comparison: \n" - << hist_no_seek_comparison.ToString() - << "non-existing Seek time: \n" - << hist_no_seek_time.ToString(); } TEST(PrefixTest, PrefixHash) { + while (NextOptions()) { + std::cout << "*** Mem table: " << options.memtable_factory->Name() + << std::endl; + DestroyDB(kDbName, Options()); + auto db = OpenDb(); + WriteOptions write_options; + ReadOptions read_options; + + std::vector prefixes; + for (uint64_t i = 0; i < FLAGS_total_prefixes; ++i) { + prefixes.push_back(i); + } - DestroyDB(kDbName, Options()); - auto db = OpenDb(); - WriteOptions write_options; - ReadOptions read_options; - - std::vector prefixes; - for (uint64_t i = 0; i < FLAGS_total_prefixes; ++i) { - prefixes.push_back(i); - } - - if (FLAGS_random_prefix) { - std::random_shuffle(prefixes.begin(), prefixes.end()); - } + if (FLAGS_random_prefix) { + std::random_shuffle(prefixes.begin(), prefixes.end()); + } - // insert x random prefix, each with y continuous element. - HistogramImpl hist_put_time; - HistogramImpl hist_put_comparison; + // insert x random prefix, each with y continuous element. + HistogramImpl hist_put_time; + HistogramImpl hist_put_comparison; - for (auto prefix : prefixes) { - for (uint64_t sorted = 0; sorted < FLAGS_items_per_prefix; sorted++) { - TestKey test_key(prefix, sorted); + for (auto prefix : prefixes) { + for (uint64_t sorted = 0; sorted < FLAGS_items_per_prefix; sorted++) { + TestKey test_key(prefix, sorted); - Slice key = TestKeyToSlice(test_key); - std::string value = "v" + std::to_string(sorted); + Slice key = TestKeyToSlice(test_key); + std::string value = "v" + std::to_string(sorted); - perf_context.Reset(); - StopWatchNano timer(Env::Default(), true); - ASSERT_OK(db->Put(write_options, key, value)); - hist_put_time.Add(timer.ElapsedNanos()); - hist_put_comparison.Add(perf_context.user_key_comparison_count); + perf_context.Reset(); + StopWatchNano timer(Env::Default(), true); + ASSERT_OK(db->Put(write_options, key, value)); + hist_put_time.Add(timer.ElapsedNanos()); + hist_put_comparison.Add(perf_context.user_key_comparison_count); + } } - } - std::cout << "Put key comparison: \n" << hist_put_comparison.ToString() - << "Put time: \n" << hist_put_time.ToString(); + std::cout << "Put key comparison: \n" << hist_put_comparison.ToString() + << "Put time: \n" << hist_put_time.ToString(); - // test seek existing keys - HistogramImpl hist_seek_time; - HistogramImpl hist_seek_comparison; + // test seek existing keys + HistogramImpl hist_seek_time; + HistogramImpl hist_seek_comparison; - for (auto prefix : prefixes) { - TestKey test_key(prefix, 0); - Slice key = TestKeyToSlice(test_key); - std::string value = "v" + std::to_string(0); + for (auto prefix : prefixes) { + TestKey test_key(prefix, 0); + Slice key = TestKeyToSlice(test_key); + std::string value = "v" + std::to_string(0); - Slice key_prefix; - if (FLAGS_use_prefix_hash_memtable) { - key_prefix = options.prefix_extractor->Transform(key); - read_options.prefix = &key_prefix; - } - std::unique_ptr iter(db->NewIterator(read_options)); + Slice key_prefix; + if (FLAGS_use_prefix_hash_memtable) { + key_prefix = options.prefix_extractor->Transform(key); + read_options.prefix = &key_prefix; + } + std::unique_ptr iter(db->NewIterator(read_options)); - perf_context.Reset(); - StopWatchNano timer(Env::Default(), true); - uint64_t total_keys = 0; - for (iter->Seek(key); iter->Valid(); iter->Next()) { - if (FLAGS_trigger_deadlock) { - std::cout << "Behold the deadlock!\n"; - db->Delete(write_options, iter->key()); + perf_context.Reset(); + StopWatchNano timer(Env::Default(), true); + uint64_t total_keys = 0; + for (iter->Seek(key); iter->Valid(); iter->Next()) { + if (FLAGS_trigger_deadlock) { + std::cout << "Behold the deadlock!\n"; + db->Delete(write_options, iter->key()); + } + auto test_key = SliceToTestKey(iter->key()); + if (test_key->prefix != prefix) break; + total_keys++; } - auto test_key = SliceToTestKey(iter->key()); - if (test_key->prefix != prefix) break; - total_keys++; + hist_seek_time.Add(timer.ElapsedNanos()); + hist_seek_comparison.Add(perf_context.user_key_comparison_count); + ASSERT_EQ(total_keys, FLAGS_items_per_prefix); } - hist_seek_time.Add(timer.ElapsedNanos()); - hist_seek_comparison.Add(perf_context.user_key_comparison_count); - ASSERT_EQ(total_keys, FLAGS_items_per_prefix); - } - std::cout << "Seek key comparison: \n" - << hist_seek_comparison.ToString() - << "Seek time: \n" - << hist_seek_time.ToString(); + std::cout << "Seek key comparison: \n" + << hist_seek_comparison.ToString() + << "Seek time: \n" + << hist_seek_time.ToString(); - // test non-existing keys - HistogramImpl hist_no_seek_time; - HistogramImpl hist_no_seek_comparison; + // test non-existing keys + HistogramImpl hist_no_seek_time; + HistogramImpl hist_no_seek_comparison; - for (auto prefix = FLAGS_total_prefixes; - prefix < FLAGS_total_prefixes + 100; - prefix++) { - TestKey test_key(prefix, 0); - Slice key = TestKeyToSlice(test_key); + for (auto prefix = FLAGS_total_prefixes; + prefix < FLAGS_total_prefixes + 100; + prefix++) { + TestKey test_key(prefix, 0); + Slice key = TestKeyToSlice(test_key); - if (FLAGS_use_prefix_hash_memtable) { - Slice key_prefix = options.prefix_extractor->Transform(key); - read_options.prefix = &key_prefix; + if (FLAGS_use_prefix_hash_memtable) { + Slice key_prefix = options.prefix_extractor->Transform(key); + read_options.prefix = &key_prefix; + } + std::unique_ptr iter(db->NewIterator(read_options)); + + perf_context.Reset(); + StopWatchNano timer(Env::Default(), true); + iter->Seek(key); + hist_no_seek_time.Add(timer.ElapsedNanos()); + hist_no_seek_comparison.Add(perf_context.user_key_comparison_count); + ASSERT_TRUE(!iter->Valid()); } - std::unique_ptr iter(db->NewIterator(read_options)); - perf_context.Reset(); - StopWatchNano timer(Env::Default(), true); - iter->Seek(key); - hist_no_seek_time.Add(timer.ElapsedNanos()); - hist_no_seek_comparison.Add(perf_context.user_key_comparison_count); - ASSERT_TRUE(!iter->Valid()); + std::cout << "non-existing Seek key comparison: \n" + << hist_no_seek_comparison.ToString() + << "non-existing Seek time: \n" + << hist_no_seek_time.ToString(); } - - std::cout << "non-existing Seek key comparison: \n" - << hist_no_seek_comparison.ToString() - << "non-existing Seek time: \n" - << hist_no_seek_time.ToString(); } } diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index c50c7b61a..e24030ddc 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -268,6 +268,13 @@ extern MemTableRepFactory* NewHashSkipListRepFactory( int32_t skiplist_height = 4, int32_t skiplist_branching_factor = 4 ); +// The factory is to create memtables with a hashed linked list: +// it contains a fixed array of buckets, each pointing to a sorted single +// linked list (null if the bucket is empty). +// bucket_count: number of fixed array buckets +extern MemTableRepFactory* NewHashLinkListRepFactory( + const SliceTransform* transform, size_t bucket_count = 50000); + } #endif // STORAGE_ROCKSDB_DB_MEMTABLEREP_H_ diff --git a/util/hash_linklist_rep.cc b/util/hash_linklist_rep.cc new file mode 100644 index 000000000..e53bffbb6 --- /dev/null +++ b/util/hash_linklist_rep.cc @@ -0,0 +1,462 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// + +#include "util/hash_linklist_rep.h" + +#include "rocksdb/memtablerep.h" +#include "rocksdb/arena.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" +#include "port/port.h" +#include "port/atomic_pointer.h" +#include "util/murmurhash.h" +#include "db/memtable.h" +#include "db/skiplist.h" + +namespace rocksdb { +namespace { + +typedef const char* Key; + +struct Node { + explicit Node(const Key& k) : + key(k) { + } + + Key const key; + + // Accessors/mutators for links. Wrapped in methods so we can + // add the appropriate barriers as necessary. + Node* Next() { + // Use an 'acquire load' so that we observe a fully initialized + // version of the returned Node. + return reinterpret_cast(next_.Acquire_Load()); + } + void SetNext(Node* x) { + // Use a 'release store' so that anybody who reads through this + // pointer observes a fully initialized version of the inserted node. + next_.Release_Store(x); + } + + // No-barrier variants that can be safely used in a few locations. + Node* NoBarrier_Next() { + return reinterpret_cast(next_.NoBarrier_Load()); + } + void NoBarrier_SetNext(Node* x) { + next_.NoBarrier_Store(x); + } + +private: + port::AtomicPointer next_; +}; + +class HashLinkListRep : public MemTableRep { + public: + HashLinkListRep(MemTableRep::KeyComparator& compare, Arena* arena, + const SliceTransform* transform, size_t bucket_size); + + virtual void Insert(const char* key) override; + + virtual bool Contains(const char* key) const override; + + virtual size_t ApproximateMemoryUsage() override; + + virtual ~HashLinkListRep(); + + virtual std::shared_ptr GetIterator() override; + + virtual std::shared_ptr GetIterator( + const Slice& slice) override; + + virtual std::shared_ptr GetPrefixIterator( + const Slice& prefix) override; + + virtual std::shared_ptr GetDynamicPrefixIterator() + override; + + private: + friend class DynamicIterator; + typedef SkipList FullList; + + size_t bucket_size_; + + // Maps slices (which are transformed user keys) to buckets of keys sharing + // the same transform. + port::AtomicPointer* buckets_; + + // The user-supplied transform whose domain is the user keys. + const SliceTransform* transform_; + + MemTableRep::KeyComparator& compare_; + // immutable after construction + Arena* const arena_; + + bool BucketContains(Node* head, const Key& key) const; + + size_t GetHash(const Slice& slice) const { + return MurmurHash(slice.data(), slice.size(), 0) % bucket_size_; + } + + Node* GetBucket(size_t i) const { + return static_cast(buckets_[i].Acquire_Load()); + } + + Node* GetBucket(const Slice& slice) const { + return GetBucket(GetHash(slice)); + } + + Node* NewNode(const Key& key) { + char* mem = arena_->AllocateAligned(sizeof(Node)); + return new (mem) Node(key); + } + + bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); } + + bool KeyIsAfterNode(const Key& key, const Node* n) const { + // nullptr n is considered infinite + return (n != nullptr) && (compare_(n->key, key) < 0); + } + + Node* FindGreaterOrEqualInBucket(Node* head, const Key& key) const; + + class FullListIterator : public MemTableRep::Iterator { + public: + explicit FullListIterator(FullList* list) + : iter_(list) {} + + virtual ~FullListIterator() { + } + + // Returns true iff the iterator is positioned at a valid node. + virtual bool Valid() const { + return iter_.Valid(); + } + + // Returns the key at the current position. + // REQUIRES: Valid() + virtual const char* key() const { + assert(Valid()); + return iter_.key(); + } + + // Advances to the next position. + // REQUIRES: Valid() + virtual void Next() { + assert(Valid()); + iter_.Next(); + } + + // Advances to the previous position. + // REQUIRES: Valid() + virtual void Prev() { + assert(Valid()); + iter_.Prev(); + } + + // Advance to the first entry with a key >= target + virtual void Seek(const Slice& internal_key, const char* memtable_key) { + const char* encoded_key = + (memtable_key != nullptr) ? + memtable_key : EncodeKey(&tmp_, internal_key); + iter_.Seek(encoded_key); + } + + // Position at the first entry in collection. + // Final state of iterator is Valid() iff collection is not empty. + virtual void SeekToFirst() { + iter_.SeekToFirst(); + } + + // Position at the last entry in collection. + // Final state of iterator is Valid() iff collection is not empty. + virtual void SeekToLast() { + iter_.SeekToLast(); + } + private: + FullList::Iterator iter_; + std::string tmp_; // For passing to EncodeKey + }; + + class Iterator : public MemTableRep::Iterator { + public: + explicit Iterator(const HashLinkListRep* const hash_link_list_rep, + Node* head) : + hash_link_list_rep_(hash_link_list_rep), head_(head), node_(nullptr) { + } + + virtual ~Iterator() { + } + + // Returns true iff the iterator is positioned at a valid node. + virtual bool Valid() const { + return node_ != nullptr; + } + + // Returns the key at the current position. + // REQUIRES: Valid() + virtual const char* key() const { + assert(Valid()); + return node_->key; + } + + // Advances to the next position. + // REQUIRES: Valid() + virtual void Next() { + assert(Valid()); + node_ = node_->Next(); + } + + // Advances to the previous position. + // REQUIRES: Valid() + virtual void Prev() { + // Prefix iterator does not support total order. + // We simply set the iterator to invalid state + Reset(nullptr); + } + + // Advance to the first entry with a key >= target + virtual void Seek(const Slice& internal_key, const char* memtable_key) { + const char* encoded_key = + (memtable_key != nullptr) ? + memtable_key : EncodeKey(&tmp_, internal_key); + node_ = hash_link_list_rep_->FindGreaterOrEqualInBucket(head_, + encoded_key); + } + + // Position at the first entry in collection. + // Final state of iterator is Valid() iff collection is not empty. + virtual void SeekToFirst() { + // Prefix iterator does not support total order. + // We simply set the iterator to invalid state + Reset(nullptr); + } + + // Position at the last entry in collection. + // Final state of iterator is Valid() iff collection is not empty. + virtual void SeekToLast() { + // Prefix iterator does not support total order. + // We simply set the iterator to invalid state + Reset(nullptr); + } + + protected: + void Reset(Node* head) { + head_ = head; + node_ = nullptr; + } + private: + friend class HashLinkListRep; + const HashLinkListRep* const hash_link_list_rep_; + Node* head_; + Node* node_; + std::string tmp_; // For passing to EncodeKey + + virtual void SeekToHead() { + node_ = head_; + } + }; + + class DynamicIterator : public HashLinkListRep::Iterator { + public: + explicit DynamicIterator(HashLinkListRep& memtable_rep) + : HashLinkListRep::Iterator(&memtable_rep, nullptr), + memtable_rep_(memtable_rep) {} + + // Advance to the first entry with a key >= target + virtual void Seek(const Slice& k, const char* memtable_key) { + auto transformed = memtable_rep_.transform_->Transform(k); + Reset(memtable_rep_.GetBucket(transformed)); + HashLinkListRep::Iterator::Seek(k, memtable_key); + } + + private: + // the underlying memtable + const HashLinkListRep& memtable_rep_; + }; + + class EmptyIterator : public MemTableRep::Iterator { + // This is used when there wasn't a bucket. It is cheaper than + // instantiating an empty bucket over which to iterate. + public: + EmptyIterator() { } + virtual bool Valid() const { + return false; + } + virtual const char* key() const { + assert(false); + return nullptr; + } + virtual void Next() { } + virtual void Prev() { } + virtual void Seek(const Slice& user_key, const char* memtable_key) { } + virtual void SeekToFirst() { } + virtual void SeekToLast() { } + private: + }; + + std::shared_ptr empty_iterator_; +}; + +HashLinkListRep::HashLinkListRep(MemTableRep::KeyComparator& compare, + Arena* arena, const SliceTransform* transform, + size_t bucket_size) + : bucket_size_(bucket_size), + transform_(transform), + compare_(compare), + arena_(arena), + empty_iterator_(std::make_shared()) { + + char* mem = arena_->AllocateAligned( + sizeof(port::AtomicPointer) * bucket_size); + + buckets_ = new (mem) port::AtomicPointer[bucket_size]; + + for (size_t i = 0; i < bucket_size_; ++i) { + buckets_[i].NoBarrier_Store(nullptr); + } +} + +HashLinkListRep::~HashLinkListRep() { +} + +void HashLinkListRep::Insert(const char* key) { + assert(!Contains(key)); + auto transformed = transform_->Transform(UserKey(key)); + auto& bucket = buckets_[GetHash(transformed)]; + Node* head = static_cast(bucket.Acquire_Load()); + + if (!head) { + Node* x = NewNode(key); + // NoBarrier_SetNext() suffices since we will add a barrier when + // we publish a pointer to "x" in prev[i]. + x->NoBarrier_SetNext(nullptr); + bucket.Release_Store(static_cast(x)); + return; + } + + Node* cur = head; + Node* prev = nullptr; + while (true) { + if (cur == nullptr) { + break; + } + Node* next = cur->Next(); + // Make sure the lists are sorted. + // If x points to head_ or next points nullptr, it is trivially satisfied. + assert((cur == head) || (next == nullptr) || + KeyIsAfterNode(next->key, cur)); + if (KeyIsAfterNode(key, cur)) { + // Keep searching in this list + prev = cur; + cur = next; + } else { + break; + } + } + + // Our data structure does not allow duplicate insertion + assert(cur == nullptr || !Equal(key, cur->key)); + + Node* x = NewNode(key); + + // NoBarrier_SetNext() suffices since we will add a barrier when + // we publish a pointer to "x" in prev[i]. + x->NoBarrier_SetNext(cur); + + if (prev) { + prev->SetNext(x); + } else { + bucket.Release_Store(static_cast(x)); + } +} + +bool HashLinkListRep::Contains(const char* key) const { + auto transformed = transform_->Transform(UserKey(key)); + auto bucket = GetBucket(transformed); + if (bucket == nullptr) { + return false; + } + return BucketContains(bucket, key); +} + +size_t HashLinkListRep::ApproximateMemoryUsage() { + // Memory is always allocated from the arena. + return 0; +} + +std::shared_ptr HashLinkListRep::GetIterator() { + auto list = new FullList(compare_, arena_); + for (size_t i = 0; i < bucket_size_; ++i) { + auto bucket = GetBucket(i); + if (bucket != nullptr) { + Iterator itr(this, bucket); + for (itr.SeekToHead(); itr.Valid(); itr.Next()) { + list->Insert(itr.key()); + } + } + } + return std::make_shared(list); +} + +std::shared_ptr HashLinkListRep::GetPrefixIterator( + const Slice& prefix) { + auto bucket = GetBucket(prefix); + if (bucket == nullptr) { + return empty_iterator_; + } + return std::make_shared(this, bucket); +} + +std::shared_ptr HashLinkListRep::GetIterator( + const Slice& slice) { + return GetPrefixIterator(transform_->Transform(slice)); +} + +std::shared_ptr + HashLinkListRep::GetDynamicPrefixIterator() { + return std::make_shared(*this); +} + +bool HashLinkListRep::BucketContains(Node* head, const Key& key) const { + Node* x = FindGreaterOrEqualInBucket(head, key); + return (x != nullptr && Equal(key, x->key)); +} + +Node* HashLinkListRep::FindGreaterOrEqualInBucket(Node* head, + const Key& key) const { + Node* x = head; + while (true) { + if (x == nullptr) { + return x; + } + Node* next = x->Next(); + // Make sure the lists are sorted. + // If x points to head_ or next points nullptr, it is trivially satisfied. + assert((x == head) || (next == nullptr) || KeyIsAfterNode(next->key, x)); + if (KeyIsAfterNode(key, x)) { + // Keep searching in this list + x = next; + } else { + break; + } + } + return x; +} + +} // anon namespace + +std::shared_ptr HashLinkListRepFactory::CreateMemTableRep( + MemTableRep::KeyComparator& compare, Arena* arena) { + return std::make_shared(compare, arena, transform_, + bucket_count_); +} + +MemTableRepFactory* NewHashLinkListRepFactory( + const SliceTransform* transform, size_t bucket_count) { + return new HashLinkListRepFactory(transform, bucket_count); +} + +} // namespace rocksdb diff --git a/util/hash_linklist_rep.h b/util/hash_linklist_rep.h new file mode 100644 index 000000000..16d2517c2 --- /dev/null +++ b/util/hash_linklist_rep.h @@ -0,0 +1,39 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include "rocksdb/slice_transform.h" +#include "rocksdb/memtablerep.h" + +namespace rocksdb { + +class HashLinkListRepFactory : public MemTableRepFactory { + public: + explicit HashLinkListRepFactory( + const SliceTransform* transform, + size_t bucket_count) + : transform_(transform), + bucket_count_(bucket_count) { } + + virtual ~HashLinkListRepFactory() { delete transform_; } + + virtual std::shared_ptr CreateMemTableRep( + MemTableRep::KeyComparator& compare, Arena* arena) override; + + virtual const char* Name() const override { + return "HashLinkListRepFactory"; + } + + const SliceTransform* GetTransform() { return transform_; } + + private: + const SliceTransform* transform_; + const size_t bucket_count_; +}; + +}