diff --git a/db/db_bench.cc b/db/db_bench.cc index 043282277..650ec5d1e 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -490,7 +490,8 @@ enum RepFactory { kSkipList, kPrefixHash, kVectorRep, - kHashLinkedList + kHashLinkedList, + kCuckoo }; namespace { @@ -505,6 +506,8 @@ enum RepFactory StringToRepFactory(const char* ctype) { return kVectorRep; else if (!strcasecmp(ctype, "hash_linkedlist")) return kHashLinkedList; + else if (!strcasecmp(ctype, "cuckoo")) + return kCuckoo; fprintf(stdout, "Cannot parse memreptable %s\n", ctype); return kSkipList; @@ -880,6 +883,9 @@ class Benchmark { case kHashLinkedList: fprintf(stdout, "Memtablerep: hash_linkedlist\n"); break; + case kCuckoo: + fprintf(stdout, "Memtablerep: cuckoo\n"); + break; } fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level); @@ -1579,6 +1585,10 @@ class Benchmark { new VectorRepFactory ); break; + case kCuckoo: + options.memtable_factory.reset(NewHashCuckooRepFactory( + options.write_buffer_size, FLAGS_key_size + FLAGS_value_size)); + break; } if (FLAGS_use_plain_table) { if (FLAGS_rep_factory != kPrefixHash && diff --git a/db/db_impl.cc b/db/db_impl.cc index c0c4fe190..818aa4bf5 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2406,6 +2406,9 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact, inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( SequenceNumber in, std::vector& snapshots, SequenceNumber* prev_snapshot) { + if (!IsSnapshotSupported()) { + return 0; + } SequenceNumber prev __attribute__((unused)) = 0; for (const auto cur : snapshots) { assert(prev <= cur); @@ -3559,7 +3562,18 @@ Status DBImpl::NewIterators( return Status::OK(); } +bool DBImpl::IsSnapshotSupported() const { + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (!cfd->mem()->IsSnapshotSupported()) { + return false; + } + } + return true; +} + const Snapshot* DBImpl::GetSnapshot() { + // returns null if the underlying memtable does not support snapshot. + if (!IsSnapshotSupported()) return nullptr; MutexLock l(&mutex_); return snapshots_.New(versions_->LastSequence()); } @@ -4422,6 +4436,12 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } } } + if (cfd->options()->merge_operator != nullptr && + !cfd->mem()->IsMergeOperatorSupported()) { + s = Status::InvalidArgument( + "The memtable of column family %s does not support merge operator " + "its options.merge_operator is non-null", cfd->GetName().c_str()); + } if (!s.ok()) { break; } diff --git a/db/db_impl.h b/db/db_impl.h index 0fa91f29a..915c0997b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -424,6 +424,13 @@ class DBImpl : public DB { // dump rocksdb.stats to LOG void MaybeDumpStats(); + // Return true if the current db supports snapshot. If the current + // DB does not support snapshot, then calling GetSnapshot() will always + // return nullptr. + // + // @see GetSnapshot() + virtual bool IsSnapshotSupported() const; + // Return the minimum empty level that could hold the total data in the // input level. Return the input level, if such level could not be found. int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level); diff --git a/db/db_test.cc b/db/db_test.cc index 97e3aecc3..24cca85d9 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -301,6 +301,7 @@ class DBTest { kPlainTableAllBytesPrefix, kVectorRep, kHashLinkList, + kHashCuckoo, kMergePut, kFilter, kUncompressed, @@ -336,7 +337,8 @@ class DBTest { kSkipMergePut = 4, kSkipPlainTable = 8, kSkipHashIndex = 16, - kSkipNoSeekToLast = 32 + kSkipNoSeekToLast = 32, + kSkipHashCuckoo = 64 }; DBTest() : option_config_(kDefault), @@ -358,7 +360,6 @@ class DBTest { // Switch to a fresh database with the next option configuration to // test. Return false if there are no more configurations to test. bool ChangeOptions(int skip_mask = kNoSkip) { - // skip some options for(option_config_++; option_config_ < kEnd; option_config_++) { if ((skip_mask & kSkipDeletesFilterFirst) && option_config_ == kDeletesFilterFirst) { @@ -386,7 +387,9 @@ class DBTest { option_config_ == kBlockBasedTableWithWholeKeyHashIndex)) { continue; } - + if ((skip_mask & kSkipHashCuckoo) && (option_config_ == kHashCuckoo)) { + continue; + } break; } @@ -417,6 +420,12 @@ class DBTest { // Return the current option configuration. Options CurrentOptions() { Options options; + return CurrentOptions(options); + } + + Options CurrentOptions(const Options& defaultOptions) { + // this redudant copy is to minimize code change w/o having lint error. + Options options = defaultOptions; switch (option_config_) { case kHashSkipList: options.prefix_extractor.reset(NewFixedPrefixTransform(1)); @@ -473,6 +482,10 @@ class DBTest { options.prefix_extractor.reset(NewFixedPrefixTransform(1)); options.memtable_factory.reset(NewHashLinkListRepFactory(4)); break; + case kHashCuckoo: + options.memtable_factory.reset( + NewHashCuckooRepFactory(options.write_buffer_size)); + break; case kUniversalCompaction: options.compaction_style = kCompactionStyleUniversal; break; @@ -1040,9 +1053,10 @@ void VerifyTableProperties(DB* db, uint64_t expected_entries_size) { TEST(DBTest, Empty) { do { - Options options = CurrentOptions(); + Options options; options.env = env_; options.write_buffer_size = 100000; // Small write buffer + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); std::string num; @@ -1244,9 +1258,10 @@ TEST(DBTest, PutDeleteGet) { TEST(DBTest, GetFromImmutableLayer) { do { - Options options = CurrentOptions(); + Options options; options.env = env_; options.write_buffer_size = 100000; // Small write buffer + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); ASSERT_OK(Put(1, "foo", "v1")); @@ -1287,7 +1302,8 @@ TEST(DBTest, GetSnapshot) { ASSERT_EQ("v1", Get(1, key, s1)); db_->ReleaseSnapshot(s1); } - } while (ChangeOptions()); + // skip as HashCuckooRep does not support snapshot + } while (ChangeOptions(kSkipHashCuckoo)); } TEST(DBTest, GetLevel0Ordering) { @@ -1499,7 +1515,9 @@ TEST(DBTest, NonBlockingIteration) { // This test verifies block cache behaviors, which is not used by plain // table format. - } while (ChangeOptions(kSkipPlainTable | kSkipNoSeekToLast)); + // Exclude kHashCuckoo as it does not support iteration currently + } while (ChangeOptions(kSkipPlainTable | kSkipNoSeekToLast | + kSkipHashCuckoo)); } // A delete is skipped for key if KeyMayExist(key) returns False @@ -2035,7 +2053,8 @@ TEST(DBTest, IterWithSnapshot) { } db_->ReleaseSnapshot(snapshot); delete iter; - } while (ChangeOptions()); + // skip as HashCuckooRep does not support snapshot + } while (ChangeOptions(kSkipHashCuckoo)); } TEST(DBTest, Recover) { @@ -2063,10 +2082,11 @@ TEST(DBTest, Recover) { TEST(DBTest, RecoverWithTableHandle) { do { - Options options = CurrentOptions(); + Options options; options.create_if_missing = true; options.write_buffer_size = 100; options.disable_auto_compactions = true; + options = CurrentOptions(options); DestroyAndReopen(&options); CreateAndReopenWithCF({"pikachu"}, &options); @@ -2184,7 +2204,7 @@ TEST(DBTest, IgnoreRecoveredLog) { } Status s = TryReopen(&options); ASSERT_TRUE(!s.ok()); - } while (ChangeOptions()); + } while (ChangeOptions(kSkipHashCuckoo)); } TEST(DBTest, RollLog) { @@ -2505,9 +2525,10 @@ TEST(DBTest, RecoveryWithEmptyLog) { // if the database is shutdown during the memtable compaction. TEST(DBTest, RecoverDuringMemtableCompaction) { do { - Options options = CurrentOptions(); + Options options; options.env = env_; options.write_buffer_size = 1000000; + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); // Trigger a long memtable compaction and reopen the database during it @@ -2526,8 +2547,9 @@ TEST(DBTest, RecoverDuringMemtableCompaction) { TEST(DBTest, MinorCompactionsHappen) { do { - Options options = CurrentOptions(); + Options options; options.write_buffer_size = 10000; + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); const int N = 500; @@ -2553,8 +2575,9 @@ TEST(DBTest, MinorCompactionsHappen) { TEST(DBTest, ManifestRollOver) { do { - Options options = CurrentOptions(); + Options options; options.max_manifest_file_size = 10 ; // 10 bytes + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); { ASSERT_OK(Put(1, "manifest_key1", std::string(1000, '1'))); @@ -2610,8 +2633,9 @@ TEST(DBTest, RecoverWithLargeLog) { // Make sure that if we re-open with a small write buffer size that // we flush table files in the middle of a large log file. - Options options = CurrentOptions(); + Options options; options.write_buffer_size = 100000; + options = CurrentOptions(options); ReopenWithColumnFamilies({"default", "pikachu"}, &options); ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3); ASSERT_EQ(std::string(200000, '1'), Get(1, "big1")); @@ -2623,8 +2647,9 @@ TEST(DBTest, RecoverWithLargeLog) { } TEST(DBTest, CompactionsGenerateMultipleFiles) { - Options options = CurrentOptions(); + Options options; options.write_buffer_size = 100000000; // Large write buffer + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); Random rnd(301); @@ -2649,11 +2674,12 @@ TEST(DBTest, CompactionsGenerateMultipleFiles) { } TEST(DBTest, CompactionTrigger) { - Options options = CurrentOptions(); + Options options; options.write_buffer_size = 100<<10; //100KB options.num_levels = 3; options.max_mem_compaction_level = 0; options.level0_file_num_compaction_trigger = 3; + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); Random rnd(301); @@ -2778,7 +2804,7 @@ class ChangeFilterFactory : public CompactionFilterFactory { // 2. Made assumption on the memtable flush conidtions, which may change from // time to time. TEST(DBTest, UniversalCompactionTrigger) { - Options options = CurrentOptions(); + Options options; options.compaction_style = kCompactionStyleUniversal; options.write_buffer_size = 100<<10; //100KB // trigger compaction if there are >= 4 files @@ -2787,6 +2813,7 @@ TEST(DBTest, UniversalCompactionTrigger) { filter->expect_manual_compaction_.store(false); options.compaction_filter_factory.reset(filter); + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); Random rnd(301); @@ -2915,7 +2942,7 @@ TEST(DBTest, UniversalCompactionTrigger) { } TEST(DBTest, UniversalCompactionSizeAmplification) { - Options options = CurrentOptions(); + Options options; options.compaction_style = kCompactionStyleUniversal; options.write_buffer_size = 100<<10; //100KB options.level0_file_num_compaction_trigger = 3; @@ -2923,6 +2950,7 @@ TEST(DBTest, UniversalCompactionSizeAmplification) { // Trigger compaction if size amplification exceeds 110% options.compaction_options_universal.max_size_amplification_percent = 110; + options = CurrentOptions(options); ReopenWithColumnFamilies({"default", "pikachu"}, &options); Random rnd(301); @@ -2953,12 +2981,13 @@ TEST(DBTest, UniversalCompactionSizeAmplification) { } TEST(DBTest, UniversalCompactionOptions) { - Options options = CurrentOptions(); + Options options; options.compaction_style = kCompactionStyleUniversal; options.write_buffer_size = 100<<10; //100KB options.level0_file_num_compaction_trigger = 4; options.num_levels = 1; options.compaction_options_universal.compression_size_percent = -1; + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); Random rnd(301); @@ -3114,6 +3143,7 @@ TEST(DBTest, CompressedCache) { Options no_block_cache_opts; no_block_cache_opts.no_block_cache = true; no_block_cache_opts.statistics = options.statistics; + options = CurrentOptions(options); ReopenWithColumnFamilies({"default", "pikachu"}, {&no_block_cache_opts, &options}); @@ -3180,12 +3210,13 @@ static std::string CompressibleString(Random* rnd, int len) { } TEST(DBTest, UniversalCompactionCompressRatio1) { - Options options = CurrentOptions(); + Options options; options.compaction_style = kCompactionStyleUniversal; options.write_buffer_size = 100<<10; //100KB options.level0_file_num_compaction_trigger = 2; options.num_levels = 1; options.compaction_options_universal.compression_size_percent = 70; + options = CurrentOptions(options); Reopen(&options); Random rnd(301); @@ -3244,12 +3275,13 @@ TEST(DBTest, UniversalCompactionCompressRatio1) { } TEST(DBTest, UniversalCompactionCompressRatio2) { - Options options = CurrentOptions(); + Options options; options.compaction_style = kCompactionStyleUniversal; options.write_buffer_size = 100<<10; //100KB options.level0_file_num_compaction_trigger = 2; options.num_levels = 1; options.compaction_options_universal.compression_size_percent = 95; + options = CurrentOptions(options); Reopen(&options); Random rnd(301); @@ -3277,7 +3309,7 @@ TEST(DBTest, ConvertCompactionStyle) { int max_key_universal_insert = 600; // Stage 1: generate a db with level compaction - Options options = CurrentOptions(); + Options options; options.write_buffer_size = 100<<10; //100KB options.num_levels = 4; options.level0_file_num_compaction_trigger = 3; @@ -3285,6 +3317,7 @@ TEST(DBTest, ConvertCompactionStyle) { options.max_bytes_for_level_multiplier = 1; options.target_file_size_base = 200<<10; // 200KB options.target_file_size_multiplier = 1; + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); for (int i = 0; i <= max_key_level_insert; i++) { @@ -3304,6 +3337,7 @@ TEST(DBTest, ConvertCompactionStyle) { // Stage 2: reopen with universal compaction - should fail options = CurrentOptions(); options.compaction_style = kCompactionStyleUniversal; + options = CurrentOptions(options); Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, &options); ASSERT_TRUE(s.IsInvalidArgument()); @@ -3314,6 +3348,7 @@ TEST(DBTest, ConvertCompactionStyle) { options.target_file_size_multiplier = 1; options.max_bytes_for_level_base = INT_MAX; options.max_bytes_for_level_multiplier = 1; + options = CurrentOptions(options); ReopenWithColumnFamilies({"default", "pikachu"}, &options); dbfull()->CompactRange(handles_[1], nullptr, nullptr, true /* reduce level */, @@ -3333,6 +3368,7 @@ TEST(DBTest, ConvertCompactionStyle) { options.compaction_style = kCompactionStyleUniversal; options.write_buffer_size = 100<<10; //100KB options.level0_file_num_compaction_trigger = 3; + options = CurrentOptions(options); ReopenWithColumnFamilies({"default", "pikachu"}, &options); for (int i = max_key_level_insert / 2; i <= max_key_universal_insert; i++) { @@ -3483,9 +3519,10 @@ TEST(DBTest, MinLevelToCompress2) { TEST(DBTest, RepeatedWritesToSameKey) { do { - Options options = CurrentOptions(); + Options options; options.env = env_; options.write_buffer_size = 100000; // Small write buffer + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); // We must have at most one file per level except for level-0, @@ -3504,11 +3541,12 @@ TEST(DBTest, RepeatedWritesToSameKey) { TEST(DBTest, InPlaceUpdate) { do { - Options options = CurrentOptions(); + Options options; options.create_if_missing = true; options.inplace_update_support = true; options.env = env_; options.write_buffer_size = 100000; + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); // Update key with values of smaller size @@ -3527,11 +3565,12 @@ TEST(DBTest, InPlaceUpdate) { TEST(DBTest, InPlaceUpdateLargeNewValue) { do { - Options options = CurrentOptions(); + Options options; options.create_if_missing = true; options.inplace_update_support = true; options.env = env_; options.write_buffer_size = 100000; + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); // Update key with values of larger size @@ -3551,7 +3590,7 @@ TEST(DBTest, InPlaceUpdateLargeNewValue) { TEST(DBTest, InPlaceUpdateCallbackSmallerSize) { do { - Options options = CurrentOptions(); + Options options; options.create_if_missing = true; options.inplace_update_support = true; @@ -3559,6 +3598,7 @@ TEST(DBTest, InPlaceUpdateCallbackSmallerSize) { options.write_buffer_size = 100000; options.inplace_callback = rocksdb::DBTest::updateInPlaceSmallerSize; + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); // Update key with values of smaller size @@ -3579,7 +3619,7 @@ TEST(DBTest, InPlaceUpdateCallbackSmallerSize) { TEST(DBTest, InPlaceUpdateCallbackSmallerVarintSize) { do { - Options options = CurrentOptions(); + Options options; options.create_if_missing = true; options.inplace_update_support = true; @@ -3587,6 +3627,7 @@ TEST(DBTest, InPlaceUpdateCallbackSmallerVarintSize) { options.write_buffer_size = 100000; options.inplace_callback = rocksdb::DBTest::updateInPlaceSmallerVarintSize; + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); // Update key with values of smaller varint size @@ -3607,7 +3648,7 @@ TEST(DBTest, InPlaceUpdateCallbackSmallerVarintSize) { TEST(DBTest, InPlaceUpdateCallbackLargeNewValue) { do { - Options options = CurrentOptions(); + Options options; options.create_if_missing = true; options.inplace_update_support = true; @@ -3615,6 +3656,7 @@ TEST(DBTest, InPlaceUpdateCallbackLargeNewValue) { options.write_buffer_size = 100000; options.inplace_callback = rocksdb::DBTest::updateInPlaceLargerSize; + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); // Update key with values of larger size @@ -3633,7 +3675,7 @@ TEST(DBTest, InPlaceUpdateCallbackLargeNewValue) { TEST(DBTest, InPlaceUpdateCallbackNoAction) { do { - Options options = CurrentOptions(); + Options options; options.create_if_missing = true; options.inplace_update_support = true; @@ -3641,6 +3683,7 @@ TEST(DBTest, InPlaceUpdateCallbackNoAction) { options.write_buffer_size = 100000; options.inplace_callback = rocksdb::DBTest::updateInPlaceNoAction; + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); // Callback function requests no actions from db @@ -3656,6 +3699,7 @@ TEST(DBTest, CompactionFilter) { options.num_levels = 3; options.max_mem_compaction_level = 0; options.compaction_filter_factory = std::make_shared(); + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); // Write 100K keys, these are written to a few files in L0. @@ -3792,12 +3836,12 @@ TEST(DBTest, CompactionFilter) { TEST(DBTest, CompactionFilterWithValueChange) { do { - Options options = CurrentOptions(); + Options options; options.num_levels = 3; options.max_mem_compaction_level = 0; options.compaction_filter_factory = std::make_shared(); - Reopen(&options); + options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, &options); // Write 100K+1 keys, these are written to a few files @@ -4115,6 +4159,7 @@ TEST(DBTest, CompactionFilterV2WithValueChange) { // compaction filter buffer using universal compaction option_config_ = kUniversalCompaction; options.compaction_style = (rocksdb::CompactionStyle)1; + options = CurrentOptions(options); Reopen(&options); // Write 100K+1 keys, these are written to a few files @@ -4253,9 +4298,10 @@ static bool Between(uint64_t val, uint64_t low, uint64_t high) { TEST(DBTest, ApproximateSizes) { do { - Options options = CurrentOptions(); + Options options; options.write_buffer_size = 100000000; // Large write buffer options.compression = kNoCompression; + options = CurrentOptions(options); DestroyAndReopen(); CreateAndReopenWithCF({"pikachu"}, &options); @@ -4411,7 +4457,7 @@ TEST(DBTest, Snapshot) { db_->ReleaseSnapshot(s2); ASSERT_EQ("0v4", Get(0, "foo")); ASSERT_EQ("1v4", Get(1, "foo")); - } while (ChangeOptions()); + } while (ChangeOptions(kSkipHashCuckoo)); } TEST(DBTest, HiddenValuesAreRemoved) { @@ -4445,7 +4491,9 @@ TEST(DBTest, HiddenValuesAreRemoved) { ASSERT_TRUE(Between(Size("", "pastfoo", 1), 0, 1000)); // ApproximateOffsetOf() is not yet implemented in plain table format, // which is used by Size(). - } while (ChangeOptions(kSkipUniversalCompaction | kSkipPlainTable)); + // skip HashCuckooRep as it does not support snapshot + } while (ChangeOptions(kSkipUniversalCompaction | kSkipPlainTable | + kSkipHashCuckoo)); } TEST(DBTest, CompactBetweenSnapshots) { @@ -4500,8 +4548,8 @@ TEST(DBTest, CompactBetweenSnapshots) { dbfull()->CompactRange(handles_[1], nullptr, nullptr); ASSERT_EQ("sixth", Get(1, "foo")); ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]"); - - } while (ChangeOptions()); + // skip HashCuckooRep as it does not support snapshot + } while (ChangeOptions(kSkipHashCuckoo)); } TEST(DBTest, DeletionMarkers1) { @@ -4721,6 +4769,7 @@ TEST(DBTest, CustomComparator) { new_options.comparator = &cmp; new_options.filter_policy = nullptr; // Cannot use bloom filters new_options.write_buffer_size = 1000; // Compact more often + new_options = CurrentOptions(new_options); DestroyAndReopen(&new_options); CreateAndReopenWithCF({"pikachu"}, &new_options); ASSERT_OK(Put(1, "[10]", "ten")); @@ -5955,7 +6004,8 @@ TEST(DBTest, MultiThreaded) { env_->SleepForMicroseconds(100000); } } - } while (ChangeOptions()); + // skip as HashCuckooRep does not support snapshot + } while (ChangeOptions(kSkipHashCuckoo)); } // Group commit test: @@ -6119,6 +6169,7 @@ class ModelDB: public DB { virtual void ReleaseSnapshot(const Snapshot* snapshot) { delete reinterpret_cast(snapshot); } + virtual Status Write(const WriteOptions& options, WriteBatch* batch) { class Handler : public WriteBatch::Handler { public: @@ -6333,6 +6384,7 @@ TEST(DBTest, Randomized) { int minimum = 0; if (option_config_ == kHashSkipList || option_config_ == kHashLinkList || + option_config_ == kHashCuckoo || option_config_ == kPlainTableFirstBytePrefix || option_config_ == kBlockBasedTableWithWholeKeyHashIndex || option_config_ == kBlockBasedTableWithPrefixHashIndex) { @@ -6393,7 +6445,9 @@ TEST(DBTest, Randomized) { } if (model_snap != nullptr) model.ReleaseSnapshot(model_snap); if (db_snap != nullptr) db_->ReleaseSnapshot(db_snap); - } while (ChangeOptions(kSkipDeletesFilterFirst | kSkipNoSeekToLast)); + // skip cuckoo hash as it does not support snapshot. + } while (ChangeOptions(kSkipDeletesFilterFirst | + kSkipNoSeekToLast | kSkipHashCuckoo)); } TEST(DBTest, MultiGetSimple) { diff --git a/db/memtable.cc b/db/memtable.cc index b13b9f294..424efe845 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -11,6 +11,7 @@ #include #include +#include #include "db/dbformat.h" #include "db/merge_context.h" @@ -62,7 +63,16 @@ MemTable::~MemTable() { } size_t MemTable::ApproximateMemoryUsage() { - return arena_.ApproximateMemoryUsage() + table_->ApproximateMemoryUsage(); + size_t arena_usage = arena_.ApproximateMemoryUsage(); + size_t table_usage = table_->ApproximateMemoryUsage(); + // let MAX_USAGE = std::numeric_limits::max() + // then if arena_usage + total_usage >= MAX_USAGE, return MAX_USAGE. + // the following variation is to avoid numeric overflow. + if (arena_usage >= std::numeric_limits::max() - table_usage) { + return std::numeric_limits::max(); + } + // otherwise, return the actual usage + return arena_usage + table_usage; } bool MemTable::ShouldFlushNow() const { diff --git a/db/memtable.h b/db/memtable.h index a4700f731..7e9af3504 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -149,6 +149,14 @@ class MemTable { // Notify the underlying storage that no more items will be added void MarkImmutable() { table_->MarkReadOnly(); } + // return true if the current MemTableRep supports merge operator. + bool IsMergeOperatorSupported() const { + return table_->IsMergeOperatorSupported(); + } + + // return true if the current MemTableRep supports snapshots. + bool IsSnapshotSupported() const { return table_->IsSnapshotSupported(); } + // Get the lock associated for the key port::RWMutex* GetLock(const Slice& key); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 6e4f976d8..158864715 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -275,6 +275,9 @@ class DB { // this handle will all observe a stable snapshot of the current DB // state. The caller must call ReleaseSnapshot(result) when the // snapshot is no longer needed. + // + // nullptr will be returned if the DB fails to take a snapshot or does + // not support snapshot. virtual const Snapshot* GetSnapshot() = 0; // Release a previously acquired snapshot. The caller must not diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index d23f41b62..445edccac 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -152,6 +152,14 @@ class MemTableRep { // a Seek might only include keys with the same prefix as the target key. virtual Iterator* GetDynamicPrefixIterator() { return GetIterator(); } + // Return true if the current MemTableRep supports merge operator. + // Default: true + virtual bool IsMergeOperatorSupported() const { return true; } + + // Return true if the current MemTableRep supports snapshot + // Default: true + virtual bool IsSnapshotSupported() const { return true; } + protected: // When *key is an internal key concatenated with the value, returns the // user key. @@ -219,6 +227,39 @@ extern MemTableRepFactory* NewHashSkipListRepFactory( extern MemTableRepFactory* NewHashLinkListRepFactory( size_t bucket_count = 50000); +// This factory creates a cuckoo-hashing based mem-table representation. +// Cuckoo-hash is a closed-hash strategy, in which all key/value pairs +// are stored in the bucket array itself intead of in some data structures +// external to the bucket array. In addition, each key in cuckoo hash +// has a constant number of possible buckets in the bucket array. These +// two properties together makes cuckoo hash more memory efficient and +// a constant worst-case read time. Cuckoo hash is best suitable for +// point-lookup workload. +// +// When inserting a key / value, it first checks whether one of its possible +// buckets is empty. If so, the key / value will be inserted to that vacant +// bucket. Otherwise, one of the keys originally stored in one of these +// possible buckets will be "kicked out" and move to one of its possible +// buckets (and possibly kicks out another victim.) In the current +// implementation, such "kick-out" path is bounded. If it cannot find a +// "kick-out" path for a specific key, this key will be stored in a backup +// structure, and the current memtable to be forced to immutable. +// +// Note that currently this mem-table representation does not support +// snapshot (i.e., it only queries latest state) and iterators. In addition, +// MultiGet operation might also lose its atomicity due to the lack of +// snapshot support. +// +// Parameters: +// write_buffer_size: the write buffer size in bytes. +// average_data_size: the average size of key + value in bytes. This value +// together with write_buffer_size will be used to compute the number +// of buckets. +// hash_function_count: the number of hash functions that will be used by +// the cuckoo-hash. The number also equals to the number of possible +// buckets each key will have. +extern MemTableRepFactory* NewHashCuckooRepFactory( + size_t write_buffer_size, size_t average_data_size = 64, + unsigned int hash_function_count = 4); #endif // ROCKSDB_LITE - } // namespace rocksdb diff --git a/util/hash_cuckoo_rep.cc b/util/hash_cuckoo_rep.cc new file mode 100644 index 000000000..d10bc5d2a --- /dev/null +++ b/util/hash_cuckoo_rep.cc @@ -0,0 +1,627 @@ + +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// + +#ifndef ROCKSDB_LITE +#include "util/hash_cuckoo_rep.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "rocksdb/memtablerep.h" +#include "util/murmurhash.h" +#include "db/memtable.h" +#include "db/skiplist.h" +#include "util/stl_wrappers.h" + +namespace rocksdb { +namespace { + +// the default maximum size of the cuckoo path searching queue +static const int kCuckooPathMaxSearchSteps = 100; + +struct CuckooStep { + static const int kNullStep = -1; + // the bucket id in the cuckoo array. + int bucket_id_; + // index of cuckoo-step array that points to its previous step, + // -1 if it the beginning step. + int prev_step_id_; + // the depth of the current step. + unsigned int depth_; + + CuckooStep() : bucket_id_(-1), prev_step_id_(kNullStep), depth_(1) {} + + CuckooStep(CuckooStep&&) = default; + CuckooStep& operator=(CuckooStep&&) = default; + + CuckooStep(const CuckooStep&) = delete; + CuckooStep& operator=(const CuckooStep&) = delete; + + CuckooStep(int bucket_id, int prev_step_id, int depth) + : bucket_id_(bucket_id), prev_step_id_(prev_step_id), depth_(depth) {} +}; + +class HashCuckooRep : public MemTableRep { + public: + explicit HashCuckooRep(const MemTableRep::KeyComparator& compare, + Arena* arena, const size_t bucket_count, + const unsigned int hash_func_count) + : MemTableRep(arena), + compare_(compare), + arena_(arena), + bucket_count_(bucket_count), + cuckoo_path_max_depth_(kDefaultCuckooPathMaxDepth), + occupied_count_(0), + hash_function_count_(hash_func_count), + backup_table_(nullptr) { + char* mem = reinterpret_cast( + arena_->Allocate(sizeof(std::atomic) * bucket_count_)); + cuckoo_array_ = new (mem) std::atomic[bucket_count_]; + for (unsigned int bid = 0; bid < bucket_count_; ++bid) { + cuckoo_array_[bid].store(nullptr, std::memory_order_relaxed); + } + + cuckoo_path_ = reinterpret_cast( + arena_->Allocate(sizeof(int*) * (cuckoo_path_max_depth_ + 1))); + is_nearly_full_ = false; + } + + // return false, indicating HashCuckooRep does not support merge operator. + virtual bool IsMergeOperatorSupported() const override { return false; } + + // return false, indicating HashCuckooRep does not support snapshot. + virtual bool IsSnapshotSupported() const override { return false; } + + // Returns true iff an entry that compares equal to key is in the collection. + virtual bool Contains(const char* internal_key) const override; + + virtual ~HashCuckooRep() override {} + + // Insert the specified key (internal_key) into the mem-table. Assertion + // fails if + // the current mem-table already contains the specified key. + virtual void Insert(KeyHandle handle) override; + + // This function returns std::numeric_limits::max() in the following + // three cases to disallow further write operations: + // 1. when the fullness reaches kMaxFullnes. + // 2. when the backup_table_ is used. + // + // otherwise, this function will always return 0. + virtual size_t ApproximateMemoryUsage() override { + if (is_nearly_full_) { + return std::numeric_limits::max(); + } + return 0; + } + + virtual void Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, + const char* entry)) override; + + class Iterator : public MemTableRep::Iterator { + std::shared_ptr> bucket_; + typename std::vector::const_iterator mutable cit_; + const KeyComparator& compare_; + std::string tmp_; // For passing to EncodeKey + bool mutable sorted_; + void DoSort() const; + + public: + explicit Iterator(std::shared_ptr> bucket, + const KeyComparator& compare); + + // Initialize an iterator over the specified collection. + // The returned iterator is not valid. + // explicit Iterator(const MemTableRep* collection); + virtual ~Iterator() override{}; + + // Returns true iff the iterator is positioned at a valid node. + virtual bool Valid() const override; + + // Returns the key at the current position. + // REQUIRES: Valid() + virtual const char* key() const override; + + // Advances to the next position. + // REQUIRES: Valid() + virtual void Next() override; + + // Advances to the previous position. + // REQUIRES: Valid() + virtual void Prev() override; + + // Advance to the first entry with a key >= target + virtual void Seek(const Slice& user_key, const char* memtable_key) override; + + // Position at the first entry in collection. + // Final state of iterator is Valid() iff collection is not empty. + virtual void SeekToFirst() override; + + // Position at the last entry in collection. + // Final state of iterator is Valid() iff collection is not empty. + virtual void SeekToLast() override; + }; + + struct CuckooStepBuffer { + CuckooStepBuffer() : write_index_(0), read_index_(0) {} + ~CuckooStepBuffer() {} + + int write_index_; + int read_index_; + CuckooStep steps_[kCuckooPathMaxSearchSteps]; + + CuckooStep& NextWriteBuffer() { return steps_[write_index_++]; } + + inline const CuckooStep& ReadNext() { return steps_[read_index_++]; } + + inline bool HasNewWrite() { return write_index_ > read_index_; } + + inline void reset() { + write_index_ = 0; + read_index_ = 0; + } + + inline bool IsFull() { return write_index_ >= kCuckooPathMaxSearchSteps; } + + // returns the number of steps that has been read + inline int ReadCount() { return read_index_; } + + // returns the number of steps that has been written to the buffer. + inline int WriteCount() { return write_index_; } + }; + + private: + const MemTableRep::KeyComparator& compare_; + // the pointer to Arena to allocate memory, immutable after construction. + Arena* const arena_; + // the number of hash bucket in the hash table. + const size_t bucket_count_; + // the maxinum depth of the cuckoo path. + const unsigned int cuckoo_path_max_depth_; + // the current number of entries in cuckoo_array_ which has been occupied. + size_t occupied_count_; + // the current number of hash functions used in the cuckoo hash. + unsigned int hash_function_count_; + // the backup MemTableRep to handle the case where cuckoo hash cannot find + // a vacant bucket for inserting the key of a put request. + std::shared_ptr backup_table_; + // the array to store pointers, pointing to the actual data. + std::atomic* cuckoo_array_; + // a buffer to store cuckoo path + int* cuckoo_path_; + // a boolean flag indicating whether the fullness of bucket array + // reaches the point to make the current memtable immutable. + bool is_nearly_full_; + + // the default maximum depth of the cuckoo path. + static const unsigned int kDefaultCuckooPathMaxDepth = 10; + + CuckooStepBuffer step_buffer_; + + // returns the bucket id assogied to the input slice based on the + unsigned int GetHash(const Slice& slice, const int hash_func_id) const { + // the seeds used in the Murmur hash to produce different hash functions. + static const int kMurmurHashSeeds[HashCuckooRepFactory::kMaxHashCount] = { + 545609244, 1769731426, 763324157, 13099088, 592422103, + 1899789565, 248369300, 1984183468, 1613664382, 1491157517}; + return MurmurHash(slice.data(), slice.size(), + kMurmurHashSeeds[hash_func_id]) % + bucket_count_; + } + + // A cuckoo path is a sequence of bucket ids, where each id points to a + // location of cuckoo_array_. This path describes the displacement sequence + // of entries in order to store the desired data specified by the input user + // key. The path starts from one of the locations associated with the + // specified user key and ends at a vacant space in the cuckoo array. This + // function will update the cuckoo_path. + // + // @return true if it found a cuckoo path. + bool FindCuckooPath(const char* internal_key, const Slice& user_key, + int* cuckoo_path, size_t* cuckoo_path_length, + int initial_hash_id = 0); + + // Perform quick insert by checking whether there is a vacant bucket in one + // of the possible locations of the input key. If so, then the function will + // return true and the key will be stored in that vacant bucket. + // + // This function is a helper function of FindCuckooPath that discovers the + // first possible steps of a cuckoo path. It begins by first computing + // the possible locations of the input keys (and stores them in bucket_ids.) + // Then, if one of its possible locations is vacant, then the input key will + // be stored in that vacant space and the function will return true. + // Otherwise, the function will return false indicating a complete search + // of cuckoo-path is needed. + bool QuickInsert(const char* internal_key, const Slice& user_key, + int bucket_ids[], const int initial_hash_id); + + // Unhide default implementations of GetIterator + using MemTableRep::GetIterator; + // Returns the pointer to the internal iterator to the buckets where buckets + // are sorted according to the user specified KeyComparator. Note that + // any insert after this function call may affect the sorted nature of + // the returned iterator. + virtual MemTableRep::Iterator* GetIterator() override { + std::vector compact_buckets; + for (unsigned int bid = 0; bid < bucket_count_; ++bid) { + const char* bucket = cuckoo_array_[bid].load(std::memory_order_relaxed); + if (bucket != nullptr) { + compact_buckets.push_back(bucket); + } + } + MemTableRep* backup_table = backup_table_.get(); + if (backup_table != nullptr) { + std::unique_ptr iter(backup_table->GetIterator()); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + compact_buckets.push_back(iter->key()); + } + } + return new Iterator( + std::shared_ptr>( + new std::vector(std::move(compact_buckets))), + compare_); + } +}; + +void HashCuckooRep::Get(const LookupKey& key, void* callback_args, + bool (*callback_func)(void* arg, const char* entry)) { + Slice user_key = key.user_key(); + for (unsigned int hid = 0; hid < hash_function_count_; ++hid) { + const char* bucket = + cuckoo_array_[GetHash(user_key, hid)].load(std::memory_order_acquire); + if (bucket != nullptr) { + auto bucket_user_key = UserKey(bucket); + if (user_key.compare(bucket_user_key) == 0) { + callback_func(callback_args, bucket); + break; + } + } else { + // as Put() always stores at the vacant bucket located by the + // hash function with the smallest possible id, when we first + // find a vacant bucket in Get(), that means a miss. + break; + } + } + MemTableRep* backup_table = backup_table_.get(); + if (backup_table != nullptr) { + backup_table->Get(key, callback_args, callback_func); + } +} + +void HashCuckooRep::Insert(KeyHandle handle) { + static const float kMaxFullness = 0.90; + + auto* key = static_cast(handle); + int initial_hash_id = 0; + size_t cuckoo_path_length = 0; + auto user_key = UserKey(key); + // find cuckoo path + if (FindCuckooPath(key, user_key, cuckoo_path_, &cuckoo_path_length, + initial_hash_id) == false) { + // if true, then we can't find a vacant bucket for this key even we + // have used up all the hash functions. Then use a backup memtable to + // store such key, which will further make this mem-table become + // immutable. + if (backup_table_.get() == nullptr) { + VectorRepFactory factory(10); + backup_table_.reset(factory.CreateMemTableRep(compare_, arena_, nullptr)); + is_nearly_full_ = true; + } + backup_table_->Insert(key); + return; + } + // when reaching this point, means the insert can be done successfully. + occupied_count_++; + if (occupied_count_ >= bucket_count_ * kMaxFullness) { + is_nearly_full_ = true; + } + + // perform kickout process if the length of cuckoo path > 1. + if (cuckoo_path_length == 0) return; + + // the cuckoo path stores the kickout path in reverse order. + // so the kickout or displacement is actually performed + // in reverse order, which avoids false-negatives on read + // by moving each key involved in the cuckoo path to the new + // location before replacing it. + for (size_t i = 1; i < cuckoo_path_length; ++i) { + int kicked_out_bid = cuckoo_path_[i - 1]; + int current_bid = cuckoo_path_[i]; + // since we only allow one writer at a time, it is safe to do relaxed read. + cuckoo_array_[kicked_out_bid] + .store(cuckoo_array_[current_bid].load(std::memory_order_relaxed), + std::memory_order_release); + } + int insert_key_bid = cuckoo_path_[cuckoo_path_length - 1]; + cuckoo_array_[insert_key_bid].store(key, std::memory_order_release); +} + +bool HashCuckooRep::Contains(const char* internal_key) const { + auto user_key = UserKey(internal_key); + for (unsigned int hid = 0; hid < hash_function_count_; ++hid) { + const char* stored_key = + cuckoo_array_[GetHash(user_key, hid)].load(std::memory_order_acquire); + if (stored_key != nullptr) { + if (compare_(internal_key, stored_key) == 0) { + return true; + } + } + } + return false; +} + +bool HashCuckooRep::QuickInsert(const char* internal_key, const Slice& user_key, + int bucket_ids[], const int initial_hash_id) { + int cuckoo_bucket_id = -1; + + // Below does the followings: + // 0. Calculate all possible locations of the input key. + // 1. Check if there is a bucket having same user_key as the input does. + // 2. If there exists such bucket, then replace this bucket by the newly + // insert data and return. This step also performs duplication check. + // 3. If no such bucket exists but exists a vacant bucket, then insert the + // input data into it. + // 4. If step 1 to 3 all fail, then return false. + for (unsigned int hid = initial_hash_id; hid < hash_function_count_; ++hid) { + bucket_ids[hid] = GetHash(user_key, hid); + // since only one PUT is allowed at a time, and this is part of the PUT + // operation, so we can safely perform relaxed load. + const char* stored_key = + cuckoo_array_[bucket_ids[hid]].load(std::memory_order_relaxed); + if (stored_key == nullptr) { + if (cuckoo_bucket_id == -1) { + cuckoo_bucket_id = bucket_ids[hid]; + } + } else { + const auto bucket_user_key = UserKey(stored_key); + if (bucket_user_key.compare(user_key) == 0) { + cuckoo_bucket_id = bucket_ids[hid]; + break; + } + } + } + + if (cuckoo_bucket_id != -1) { + cuckoo_array_[cuckoo_bucket_id] + .store(internal_key, std::memory_order_release); + return true; + } + + return false; +} + +// Perform pre-check and find the shortest cuckoo path. A cuckoo path +// is a displacement sequence for inserting the specified input key. +// +// @return true if it successfully found a vacant space or cuckoo-path. +// If the return value is true but the length of cuckoo_path is zero, +// then it indicates that a vacant bucket or an bucket with matched user +// key with the input is found, and a quick insertion is done. +bool HashCuckooRep::FindCuckooPath(const char* internal_key, + const Slice& user_key, int* cuckoo_path, + size_t* cuckoo_path_length, + const int initial_hash_id) { + int bucket_ids[HashCuckooRepFactory::kMaxHashCount]; + *cuckoo_path_length = 0; + + if (QuickInsert(internal_key, user_key, bucket_ids, initial_hash_id)) { + return true; + } + // If this step is reached, then it means: + // 1. no vacant bucket in any of the possible locations of the input key. + // 2. none of the possible locations of the input key has the same user + // key as the input `internal_key`. + + // the front and back indices for the step_queue_ + step_buffer_.reset(); + + for (unsigned int hid = initial_hash_id; hid < hash_function_count_; ++hid) { + /// CuckooStep& current_step = step_queue_[front_pos++]; + CuckooStep& current_step = step_buffer_.NextWriteBuffer(); + current_step.bucket_id_ = bucket_ids[hid]; + current_step.prev_step_id_ = CuckooStep::kNullStep; + current_step.depth_ = 1; + } + + while (step_buffer_.HasNewWrite()) { + int step_id = step_buffer_.read_index_; + const CuckooStep& step = step_buffer_.ReadNext(); + // Since it's a BFS process, then the first step with its depth deeper + // than the maximum allowed depth indicates all the remaining steps + // in the step buffer queue will all exceed the maximum depth. + // Return false immediately indicating we can't find a vacant bucket + // for the input key before the maximum allowed depth. + if (step.depth_ >= cuckoo_path_max_depth_) { + return false; + } + // again, we can perform no barrier load safely here as the current + // thread is the only writer. + auto bucket_user_key = + UserKey(cuckoo_array_[step.bucket_id_].load(std::memory_order_relaxed)); + if (step.prev_step_id_ != CuckooStep::kNullStep) { + if (bucket_user_key.compare(user_key) == 0) { + // then there is a loop in the current path, stop discovering this path. + continue; + } + } + // if the current bucket stores at its nth location, then we only consider + // its mth location where m > n. This property makes sure that all reads + // will not miss if we do have data associated to the query key. + // + // The n and m in the above statement is the start_hid and hid in the code. + unsigned int start_hid = hash_function_count_; + for (unsigned int hid = 0; hid < hash_function_count_; ++hid) { + bucket_ids[hid] = GetHash(bucket_user_key, hid); + if (step.bucket_id_ == bucket_ids[hid]) { + start_hid = hid; + } + } + // must found a bucket which is its current "home". + assert(start_hid != hash_function_count_); + + // explore all possible next steps from the current step. + for (unsigned int hid = start_hid + 1; hid < hash_function_count_; ++hid) { + CuckooStep& next_step = step_buffer_.NextWriteBuffer(); + next_step.bucket_id_ = bucket_ids[hid]; + next_step.prev_step_id_ = step_id; + next_step.depth_ = step.depth_ + 1; + // once a vacant bucket is found, trace back all its previous steps + // to generate a cuckoo path. + if (cuckoo_array_[next_step.bucket_id_].load(std::memory_order_relaxed) == + nullptr) { + // store the last step in the cuckoo path. Note that cuckoo_path + // stores steps in reverse order. This allows us to move keys along + // the cuckoo path by storing each key to the new place first before + // removing it from the old place. This property ensures reads will + // not missed due to moving keys along the cuckoo path. + cuckoo_path[(*cuckoo_path_length)++] = next_step.bucket_id_; + int depth; + for (depth = step.depth_; depth > 0 && step_id != CuckooStep::kNullStep; + depth--) { + const CuckooStep& prev_step = step_buffer_.steps_[step_id]; + cuckoo_path[(*cuckoo_path_length)++] = prev_step.bucket_id_; + step_id = prev_step.prev_step_id_; + } + assert(depth == 0 && step_id == CuckooStep::kNullStep); + return true; + } + if (step_buffer_.IsFull()) { + // if true, then it reaches maxinum number of cuckoo search steps. + return false; + } + } + } + + // tried all possible paths but still not unable to find a cuckoo path + // which path leads to a vacant bucket. + return false; +} + +HashCuckooRep::Iterator::Iterator( + std::shared_ptr> bucket, + const KeyComparator& compare) + : bucket_(bucket), + cit_(bucket_->end()), + compare_(compare), + sorted_(false) {} + +void HashCuckooRep::Iterator::DoSort() const { + if (!sorted_) { + std::sort(bucket_->begin(), bucket_->end(), + stl_wrappers::Compare(compare_)); + cit_ = bucket_->begin(); + sorted_ = true; + } +} + +// Returns true iff the iterator is positioned at a valid node. +bool HashCuckooRep::Iterator::Valid() const { + DoSort(); + return cit_ != bucket_->end(); +} + +// Returns the key at the current position. +// REQUIRES: Valid() +const char* HashCuckooRep::Iterator::key() const { + assert(Valid()); + return *cit_; +} + +// Advances to the next position. +// REQUIRES: Valid() +void HashCuckooRep::Iterator::Next() { + assert(Valid()); + if (cit_ == bucket_->end()) { + return; + } + ++cit_; +} + +// Advances to the previous position. +// REQUIRES: Valid() +void HashCuckooRep::Iterator::Prev() { + assert(Valid()); + if (cit_ == bucket_->begin()) { + // If you try to go back from the first element, the iterator should be + // invalidated. So we set it to past-the-end. This means that you can + // treat the container circularly. + cit_ = bucket_->end(); + } else { + --cit_; + } +} + +// Advance to the first entry with a key >= target +void HashCuckooRep::Iterator::Seek(const Slice& user_key, + const char* memtable_key) { + DoSort(); + // Do binary search to find first value not less than the target + const char* encoded_key = + (memtable_key != nullptr) ? memtable_key : EncodeKey(&tmp_, user_key); + cit_ = std::equal_range(bucket_->begin(), bucket_->end(), encoded_key, + [this](const char* a, const char* b) { + return compare_(a, b) < 0; + }).first; +} + +// Position at the first entry in collection. +// Final state of iterator is Valid() iff collection is not empty. +void HashCuckooRep::Iterator::SeekToFirst() { + DoSort(); + cit_ = bucket_->begin(); +} + +// Position at the last entry in collection. +// Final state of iterator is Valid() iff collection is not empty. +void HashCuckooRep::Iterator::SeekToLast() { + DoSort(); + cit_ = bucket_->end(); + if (bucket_->size() != 0) { + --cit_; + } +} + +} // anom namespace + +MemTableRep* HashCuckooRepFactory::CreateMemTableRep( + const MemTableRep::KeyComparator& compare, Arena* arena, + const SliceTransform* transform) { + // The estimated average fullness. The write performance of any close hash + // degrades as the fullness of the mem-table increases. Setting kFullness + // to a value around 0.7 can better avoid write performance degradation while + // keeping efficient memory usage. + static const float kFullness = 0.7; + size_t pointer_size = sizeof(std::atomic); + assert(write_buffer_size_ >= (average_data_size_ + pointer_size)); + size_t bucket_count = + (write_buffer_size_ / (average_data_size_ + pointer_size)) / kFullness + + 1; + unsigned int hash_function_count = hash_function_count_; + if (hash_function_count < 2) { + hash_function_count = 2; + } + if (hash_function_count > kMaxHashCount) { + hash_function_count = kMaxHashCount; + } + return new HashCuckooRep(compare, arena, bucket_count, hash_function_count); +} + +MemTableRepFactory* NewHashCuckooRepFactory(size_t write_buffer_size, + size_t average_data_size, + unsigned int hash_function_count) { + return new HashCuckooRepFactory(write_buffer_size, average_data_size, + hash_function_count); +} + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/util/hash_cuckoo_rep.h b/util/hash_cuckoo_rep.h new file mode 100644 index 000000000..55093dfbd --- /dev/null +++ b/util/hash_cuckoo_rep.h @@ -0,0 +1,42 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_LITE +#pragma once +#include "rocksdb/slice_transform.h" +#include "rocksdb/memtablerep.h" + +namespace rocksdb { + +class HashCuckooRepFactory : public MemTableRepFactory { + public: + // maxinum number of hash functions used in the cuckoo hash. + static const int kMaxHashCount = 10; + + explicit HashCuckooRepFactory(size_t write_buffer_size, + size_t average_data_size, + unsigned int hash_function_count) + : write_buffer_size_(write_buffer_size), + average_data_size_(average_data_size), + hash_function_count_(hash_function_count) {} + + virtual ~HashCuckooRepFactory() {} + + virtual MemTableRep* CreateMemTableRep( + const MemTableRep::KeyComparator& compare, Arena* arena, + const SliceTransform* transform) override; + + virtual const char* Name() const override { return "HashCuckooRepFactory"; } + + private: + size_t write_buffer_size_; + size_t average_data_size_; + const unsigned int hash_function_count_; +}; +} // namespace rocksdb +#endif // ROCKSDB_LITE