From 9dc887ece05860f091ceef25c7f61c3359de0cda Mon Sep 17 00:00:00 2001 From: Baptiste Lemaire Date: Fri, 2 Jul 2021 05:22:03 -0700 Subject: [PATCH] Memtable "MemPurge" prototype (#8454) Summary: Implement an experimental feature called "MemPurge", which consists in purging "garbage" bytes out of a memtable and reuse the memtable struct instead of making it immutable and eventually flushing its content to storage. The prototype is by default deactivated and is not intended for use. It is intended for correctness and validation testing. At the moment, the "MemPurge" feature can be switched on by using the `options.experimental_allow_mempurge` flag. For this early stage, when the allow_mempurge flag is set to `true`, all the flush operations will be rerouted to perform a MemPurge. This is a temporary design decision that will give us the time to explore meaningful heuristics to use MemPurge at the right time for relevant workloads . Moreover, the current MemPurge operation only supports `Puts`, `Deletes`, `DeleteRange` operations, and handles `Iterators` as well as `CompactionFilter`s that are invoked at flush time . Three unit tests are added to `db_flush_test.cc` to test if MemPurge works correctly (and checks that the previously mentioned operations are fully supported thoroughly tested). One noticeable design decision is the timing of the MemPurge operation in the memtable workflow: for this prototype, the mempurge happens when the memtable is switched (and usually made immutable). This is an inefficient process because it implies that the entirety of the MemPurge operation happens while holding the db_mutex. Future commits will make the MemPurge operation a background task (akin to the regular flush operation) and aim at drastically enhancing the performance of this operation. The MemPurge is also not fully "WAL-compatible" yet, but when the WAL is full, or when the regular MemPurge operation fails (or when the purged memtable still needs to be flushed), a regular flush operation takes place. Later commits will also correct these behaviors. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8454 Reviewed By: anand1976 Differential Revision: D29433971 Pulled By: bjlemaire fbshipit-source-id: 6af48213554e35048a7e03816955100a80a26dc5 --- db/builder.cc | 2 + db/c.cc | 5 + db/column_family.cc | 24 +- db/column_family.h | 8 +- db/db_flush_test.cc | 371 +++++++++++++++++++++++++ db/db_impl/db_impl.h | 21 +- db/db_impl/db_impl_compaction_flush.cc | 6 +- db/db_impl/db_impl_write.cc | 224 ++++++++++++++- db/flush_job.cc | 1 + db/memtable.h | 17 ++ include/rocksdb/options.h | 4 + options/db_options.cc | 8 + options/db_options.h | 1 + options/options_test.cc | 4 + 14 files changed, 682 insertions(+), 14 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index b37e7dd7d..16e5744d1 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -204,6 +204,8 @@ Status BuildTable( const Slice& value = c_iter.value(); const ParsedInternalKey& ikey = c_iter.ikey(); // Generate a rolling 64-bit hash of the key and values + // Note : + // Here "key" integrates 'sequence_number'+'kType'+'user key'. s = output_validator.Add(key, value); if (!s.ok()) { break; diff --git a/db/c.cc b/db/c.cc index 79fa5181d..3c719e6ff 100644 --- a/db/c.cc +++ b/db/c.cc @@ -3028,6 +3028,11 @@ unsigned char rocksdb_options_get_advise_random_on_open( return opt->rep.advise_random_on_open; } +void rocksdb_options_set_experimental_allow_mempurge(rocksdb_options_t* opt, + unsigned char v) { + opt->rep.experimental_allow_mempurge = v; +} + void rocksdb_options_set_access_hint_on_compaction_start( rocksdb_options_t* opt, int v) { switch(v) { diff --git a/db/column_family.cc b/db/column_family.cc index c168f2b1d..178186379 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -443,14 +443,26 @@ bool SuperVersion::Unref() { return previous_refs == 1; } -void SuperVersion::Cleanup() { +void SuperVersion::Cleanup(const bool noImmMemoryContribution) { assert(refs.load(std::memory_order_relaxed) == 0); + // Since this SuperVersion object is being deleted, + // decrement reference to the immutable MemtableList + // this SV object was pointing to. imm->Unref(&to_delete); MemTable* m = mem->Unref(); if (m != nullptr) { - auto* memory_usage = current->cfd()->imm()->current_memory_usage(); - assert(*memory_usage >= m->ApproximateMemoryUsage()); - *memory_usage -= m->ApproximateMemoryUsage(); + // Typically, if the m memtable was not made + // immutable, and therefore was not added to the + // imm list, it does not contribute to the imm + // memory footprint (and actually is not part of + // the 'imm' MemtableList at all). + // At the moment, noImmMemoryContribution is only + // used by the experimental 'MemPurge' prototype. + if (!noImmMemoryContribution) { + auto* memory_usage = current->cfd()->imm()->current_memory_usage(); + assert(*memory_usage >= m->ApproximateMemoryUsage()); + *memory_usage -= m->ApproximateMemoryUsage(); + } to_delete.push_back(m); } current->Unref(); @@ -1260,7 +1272,7 @@ void ColumnFamilyData::InstallSuperVersion( void ColumnFamilyData::InstallSuperVersion( SuperVersionContext* sv_context, InstrumentedMutex* db_mutex, - const MutableCFOptions& mutable_cf_options) { + const MutableCFOptions& mutable_cf_options, bool noImmMemoryContribution) { SuperVersion* new_superversion = sv_context->new_superversion.release(); new_superversion->db_mutex = db_mutex; new_superversion->mutable_cf_options = mutable_cf_options; @@ -1290,7 +1302,7 @@ void ColumnFamilyData::InstallSuperVersion( new_superversion->write_stall_condition, GetName(), ioptions()); } if (old_superversion->Unref()) { - old_superversion->Cleanup(); + old_superversion->Cleanup(noImmMemoryContribution); sv_context->superversions_to_free.push_back(old_superversion); } } diff --git a/db/column_family.h b/db/column_family.h index 7ad560e44..cfba13171 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -222,7 +222,10 @@ struct SuperVersion { // Cleanup unrefs mem, imm and current. Also, it stores all memtables // that needs to be deleted in to_delete vector. Unrefing those // objects needs to be done in the mutex - void Cleanup(); + // The 'noImmMemoryContribution' is set to true if the memtable being + // dereferenced in this SuperVersion was not added to the Immutable + // memtable list. + void Cleanup(bool noImmMemoryContribution = false); void Init(ColumnFamilyData* new_cfd, MemTable* new_mem, MemTableListVersion* new_imm, Version* new_current); @@ -454,7 +457,8 @@ class ColumnFamilyData { // IMPORTANT: Only call this from DBImpl::InstallSuperVersion() void InstallSuperVersion(SuperVersionContext* sv_context, InstrumentedMutex* db_mutex, - const MutableCFOptions& mutable_cf_options); + const MutableCFOptions& mutable_cf_options, + bool noImmMemoryContribution = false); void InstallSuperVersion(SuperVersionContext* sv_context, InstrumentedMutex* db_mutex); diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index b5d3026d8..ef062f510 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -24,6 +24,10 @@ namespace ROCKSDB_NAMESPACE { +// This is a static filter used for filtering +// kvs during the compaction process. +static std::string NEW_VALUE = "NewValue"; + class DBFlushTest : public DBTestBase { public: DBFlushTest() : DBTestBase("/db_flush_test", /*env_do_fsync=*/true) {} @@ -658,6 +662,373 @@ TEST_F(DBFlushTest, StatisticsGarbageRangeDeletes) { Close(); } +TEST_F(DBFlushTest, MemPurgeBasic) { + Options options = CurrentOptions(); + + // The following options are used to enforce several values that + // may already exist as default values to make this test resilient + // to default value updates in the future. + options.statistics = CreateDBStatistics(); + + // Record all statistics. + options.statistics->set_stats_level(StatsLevel::kAll); + + // create the DB if it's not already present + options.create_if_missing = true; + + // Useful for now as we are trying to compare uncompressed data savings on + // flush(). + options.compression = kNoCompression; + + // Prevent memtable in place updates. Should already be disabled + // (from Wiki: + // In place updates can be enabled by toggling on the bool + // inplace_update_support flag. However, this flag is by default set to + // false + // because this thread-safe in-place update support is not compatible + // with concurrent memtable writes. Note that the bool + // allow_concurrent_memtable_write is set to true by default ) + options.inplace_update_support = false; + options.allow_concurrent_memtable_write = true; + + // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). + options.write_buffer_size = 64 << 20; + // Activate the MemPurge prototype. + options.experimental_allow_mempurge = true; + ASSERT_OK(TryReopen(options)); + uint32_t mempurge_count = 0; + uint32_t flush_count = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MemPurge", [&](void* /*arg*/) { mempurge_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:Flush", [&](void* /*arg*/) { flush_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::string KEY1 = "IamKey1"; + std::string KEY2 = "IamKey2"; + std::string KEY3 = "IamKey3"; + std::string KEY4 = "IamKey4"; + std::string KEY5 = "IamKey5"; + std::string VALUE1 = "IamValue1"; + std::string VALUE2 = "IamValue2"; + const std::string NOT_FOUND = "NOT_FOUND"; + + // Check simple operations (put-delete). + ASSERT_OK(Put(KEY1, VALUE1)); + ASSERT_OK(Put(KEY2, VALUE2)); + ASSERT_OK(Delete(KEY1)); + ASSERT_OK(Put(KEY2, VALUE1)); + ASSERT_OK(Put(KEY1, VALUE2)); + ASSERT_OK(Flush()); + + ASSERT_EQ(Get(KEY1), VALUE2); + ASSERT_EQ(Get(KEY2), VALUE1); + + ASSERT_OK(Delete(KEY1)); + ASSERT_EQ(Get(KEY1), NOT_FOUND); + ASSERT_OK(Flush()); + ASSERT_EQ(Get(KEY1), NOT_FOUND); + + // Heavy overwrite workload, + // more than would fit in maximum allowed memtables. + Random rnd(719); + const size_t NUM_REPEAT = 100000; + const size_t RAND_VALUES_LENGTH = 512; + std::string p_v1, p_v2, p_v3, p_v4, p_v5; + // Insertion of of K-V pairs, multiple times. + // Also insert DeleteRange + for (size_t i = 0; i < NUM_REPEAT; i++) { + // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. + p_v1 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v2 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v3 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v4 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v5 = rnd.RandomString(RAND_VALUES_LENGTH); + + ASSERT_OK(Put(KEY1, p_v1)); + ASSERT_OK(Put(KEY2, p_v2)); + ASSERT_OK(Put(KEY3, p_v3)); + ASSERT_OK(Put(KEY4, p_v4)); + ASSERT_OK(Put(KEY5, p_v5)); + + ASSERT_EQ(Get(KEY1), p_v1); + ASSERT_EQ(Get(KEY2), p_v2); + ASSERT_EQ(Get(KEY3), p_v3); + ASSERT_EQ(Get(KEY4), p_v4); + ASSERT_EQ(Get(KEY5), p_v5); + } + + // Check that there was at least one mempurge + const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1; + // Check that there was no flush to storage. + const uint32_t EXPECTED_FLUSH_COUNT = 0; + + EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); + EXPECT_EQ(flush_count, EXPECTED_FLUSH_COUNT); + + Close(); +} + +TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { + Options options = CurrentOptions(); + + options.statistics = CreateDBStatistics(); + options.statistics->set_stats_level(StatsLevel::kAll); + options.create_if_missing = true; + options.compression = kNoCompression; + options.inplace_update_support = false; + options.allow_concurrent_memtable_write = true; + + // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). + options.write_buffer_size = 64 << 20; + // Activate the MemPurge prototype. + options.experimental_allow_mempurge = true; + ASSERT_OK(TryReopen(options)); + + uint32_t mempurge_count = 0; + uint32_t flush_count = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MemPurge", [&](void* /*arg*/) { mempurge_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:Flush", [&](void* /*arg*/) { flush_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::string KEY1 = "ThisIsKey1"; + std::string KEY2 = "ThisIsKey2"; + std::string KEY3 = "ThisIsKey3"; + std::string KEY4 = "ThisIsKey4"; + std::string KEY5 = "ThisIsKey5"; + const std::string NOT_FOUND = "NOT_FOUND"; + + Random rnd(117); + const size_t NUM_REPEAT = 200; + const size_t RAND_VALUES_LENGTH = 512; + bool atLeastOneFlush = false; + std::string key, value, p_v1, p_v2, p_v3, p_v3b, p_v4, p_v5; + int count = 0; + const int EXPECTED_COUNT_FORLOOP = 3; + const int EXPECTED_COUNT_END = 4; + + ReadOptions ropt; + ropt.pin_data = true; + ropt.total_order_seek = true; + Iterator* iter = nullptr; + // Insertion of of K-V pairs, multiple times. + // Also insert DeleteRange + for (size_t i = 0; i < NUM_REPEAT; i++) { + // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. + p_v1 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v2 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v3 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v3b = rnd.RandomString(RAND_VALUES_LENGTH); + p_v4 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v5 = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEY1, p_v1)); + ASSERT_OK(Put(KEY2, p_v2)); + ASSERT_OK(Put(KEY3, p_v3)); + ASSERT_OK(Put(KEY4, p_v4)); + ASSERT_OK(Put(KEY5, p_v5)); + ASSERT_OK(Delete(KEY2)); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY2, + KEY4)); + ASSERT_OK(Put(KEY3, p_v3b)); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY1, + KEY3)); + ASSERT_OK(Delete(KEY1)); + + // Flush (MemPurge) with a probability of 50%. + if (rnd.OneIn(2)) { + ASSERT_OK(Flush()); + atLeastOneFlush = true; + } + + ASSERT_EQ(Get(KEY1), NOT_FOUND); + ASSERT_EQ(Get(KEY2), NOT_FOUND); + ASSERT_EQ(Get(KEY3), p_v3b); + ASSERT_EQ(Get(KEY4), p_v4); + ASSERT_EQ(Get(KEY5), p_v5); + + iter = db_->NewIterator(ropt); + iter->SeekToFirst(); + count = 0; + for (; iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + key = (iter->key()).ToString(false); + value = (iter->value()).ToString(false); + if (key.compare(KEY3) == 0) + ASSERT_EQ(value, p_v3b); + else if (key.compare(KEY4) == 0) + ASSERT_EQ(value, p_v4); + else if (key.compare(KEY5) == 0) + ASSERT_EQ(value, p_v5); + else + ASSERT_EQ(value, NOT_FOUND); + count++; + } + + // Expected count here is 3: KEY3, KEY4, KEY5. + ASSERT_EQ(count, EXPECTED_COUNT_FORLOOP); + if (iter) { + delete iter; + } + } + + // Check that there was at least one mempurge + const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1; + // Check that there was no flush to storage. + const uint32_t EXPECTED_FLUSH_COUNT = 0; + + if (atLeastOneFlush) { + EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); + } else { + // Note that there isn't enough values added to + // automatically trigger a flush/MemPurge in the background. + // Therefore we can make the assumption that if we never + // called "Flush()", no mempurge happened. + EXPECT_EQ(mempurge_count, EXPECTED_FLUSH_COUNT); + } + EXPECT_EQ(flush_count, EXPECTED_FLUSH_COUNT); + + // Additional test for the iterator+memPurge. + ASSERT_OK(Put(KEY2, p_v2)); + iter = db_->NewIterator(ropt); + iter->SeekToFirst(); + ASSERT_OK(Put(KEY4, p_v4)); + count = 0; + for (; iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + key = (iter->key()).ToString(false); + value = (iter->value()).ToString(false); + if (key.compare(KEY2) == 0) + ASSERT_EQ(value, p_v2); + else if (key.compare(KEY3) == 0) + ASSERT_EQ(value, p_v3b); + else if (key.compare(KEY4) == 0) + ASSERT_EQ(value, p_v4); + else if (key.compare(KEY5) == 0) + ASSERT_EQ(value, p_v5); + else + ASSERT_EQ(value, NOT_FOUND); + count++; + } + // Expected count here is 4: KEY2, KEY3, KEY4, KEY5. + ASSERT_EQ(count, EXPECTED_COUNT_END); + if (iter) delete iter; + + Close(); +} + +// Create a Compaction Fitler that will be invoked +// at flush time and will update the value of a KV pair +// if the key string is "lower" than the filter_key_ string. +class ConditionalUpdateFilter : public CompactionFilter { + public: + explicit ConditionalUpdateFilter(const std::string* filtered_key) + : filtered_key_(filtered_key) {} + bool Filter(int /*level*/, const Slice& key, const Slice& /*value*/, + std::string* new_value, bool* value_changed) const override { + // If key CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) override { + return std::unique_ptr( + new ConditionalUpdateFilter(&filtered_key_)); + } + + const char* Name() const override { return "ConditionalUpdateFilterFactory"; } + + bool ShouldFilterTableFileCreation( + TableFileCreationReason reason) const override { + // This compaction filter will be invoked + // at flush time (and therefore at MemPurge time). + return (reason == TableFileCreationReason::kFlush); + } + + private: + std::string filtered_key_; +}; + +TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { + Options options = CurrentOptions(); + + std::string KEY1 = "ThisIsKey1"; + std::string KEY2 = "ThisIsKey2"; + std::string KEY3 = "ThisIsKey3"; + std::string KEY4 = "ThisIsKey4"; + std::string KEY5 = "ThisIsKey5"; + const std::string NOT_FOUND = "NOT_FOUND"; + + options.statistics = CreateDBStatistics(); + options.statistics->set_stats_level(StatsLevel::kAll); + options.create_if_missing = true; + options.compression = kNoCompression; + options.inplace_update_support = false; + options.allow_concurrent_memtable_write = true; + + // Create a ConditionalUpdate compaction filter + // that will update all the values of the KV pairs + // where the keys are "lower" than KEY4. + options.compaction_filter_factory = + std::make_shared(KEY4); + + // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). + options.write_buffer_size = 64 << 20; + // Activate the MemPurge prototype. + options.experimental_allow_mempurge = true; + ASSERT_OK(TryReopen(options)); + + Random rnd(53); + const size_t NUM_REPEAT = 25; + const size_t RAND_VALUES_LENGTH = 128; + std::string p_v1, p_v2, p_v3, p_v4, p_v5; + + // Insertion of of K-V pairs, multiple times. + // Also insert DeleteRange + for (size_t i = 0; i < NUM_REPEAT; i++) { + // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. + p_v1 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v2 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v3 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v4 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v5 = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEY1, p_v1)); + ASSERT_OK(Put(KEY2, p_v2)); + ASSERT_OK(Put(KEY3, p_v3)); + ASSERT_OK(Put(KEY4, p_v4)); + ASSERT_OK(Put(KEY5, p_v5)); + + ASSERT_OK(Delete(KEY1)); + + ASSERT_OK(Flush()); + + // Verify that the ConditionalUpdateCompactionFilter + // updated the values of KEY2 and KEY3, and not KEY4 and KEY5. + ASSERT_EQ(Get(KEY1), NOT_FOUND); + ASSERT_EQ(Get(KEY2), NEW_VALUE); + ASSERT_EQ(Get(KEY3), NEW_VALUE); + ASSERT_EQ(Get(KEY4), p_v4); + ASSERT_EQ(Get(KEY5), p_v5); + } +} + TEST_P(DBFlushDirectIOTest, DirectIO) { Options options; options.create_if_missing = true; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 3d4d0a2e7..89e1719a6 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -20,6 +20,7 @@ #include #include "db/column_family.h" +#include "db/compaction/compaction_iterator.h" #include "db/compaction/compaction_job.h" #include "db/dbformat.h" #include "db/error_handler.h" @@ -53,6 +54,7 @@ #include "rocksdb/trace_reader_writer.h" #include "rocksdb/transaction_log.h" #include "rocksdb/write_buffer_manager.h" +#include "table/merging_iterator.h" #include "table/scoped_arena_iterator.h" #include "util/autovector.h" #include "util/hash.h" @@ -1610,6 +1612,23 @@ class DBImpl : public DB { Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); + // Memtable Garbage Collection algorithm: a MemPurge takes the memtable + // and filters (or "purge") the outdated bytes out of it. The output + // (the filtered bytes, or "useful payload") is then transfered into + // the new memtable "new_mem". This process is typically intended for + // workloads with heavy overwrites to save on IO cost resulting from + // expensive flush operations. + // "MemPurge" is an experimental feature still at a very early stage + // of development. At the moment it is only compatible with the Get, Put, + // Delete operations as well as Iterators and CompactionFilters. + // For this early version, "MemPurge" is called by setting the + // options.experimental_allow_mempurge flag as "true". When this is + // the case, ALL flush operations will be replaced by MemPurge operations. + // (for prototype stress-testing purposes). Therefore, we strongly + // recommend all users not to set this flag as true given that the MemPurge + // process has not matured yet. + Status MemPurge(ColumnFamilyData* cfd, MemTable* new_mem); + void SelectColumnFamiliesForAtomicFlush(autovector* cfds); // Force current memtable contents to be flushed. @@ -1835,7 +1854,7 @@ class DBImpl : public DB { // state needs flush or compaction. void InstallSuperVersionAndScheduleWork( ColumnFamilyData* cfd, SuperVersionContext* sv_context, - const MutableCFOptions& mutable_cf_options); + const MutableCFOptions& mutable_cf_options, bool fromMemPurge = false); bool GetIntPropertyInternal(ColumnFamilyData* cfd, const DBPropertyInfo& property_info, diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 9c85aa773..69a98c934 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -151,7 +151,6 @@ Status DBImpl::FlushMemTableToOutputFile( assert(cfd); assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); - FlushJob flush_job( dbname_, cfd, immutable_db_options_, mutable_cf_options, port::kMaxUint64 /* memtable_id */, file_options_for_compaction_, @@ -3437,7 +3436,7 @@ void DBImpl::BuildCompactionJobInfo( void DBImpl::InstallSuperVersionAndScheduleWork( ColumnFamilyData* cfd, SuperVersionContext* sv_context, - const MutableCFOptions& mutable_cf_options) { + const MutableCFOptions& mutable_cf_options, bool fromMemPurge) { mutex_.AssertHeld(); // Update max_total_in_memory_state_ @@ -3452,7 +3451,8 @@ void DBImpl::InstallSuperVersionAndScheduleWork( if (UNLIKELY(sv_context->new_superversion == nullptr)) { sv_context->NewSuperVersion(); } - cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options); + cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options, + fromMemPurge); // There may be a small data race here. The snapshot tricking bottommost // compaction may already be released here. But assuming there will always be diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 2bd6f7124..acccbff23 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1737,6 +1737,184 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/, } #endif // ROCKSDB_LITE +Status DBImpl::MemPurge(ColumnFamilyData* cfd, MemTable* new_mem) { + Status s; + assert(new_mem != nullptr); + + JobContext job_context(next_job_id_.fetch_add(1), true); + std::vector snapshot_seqs; + SequenceNumber earliest_write_conflict_snapshot; + SnapshotChecker* snapshot_checker; + GetSnapshotContext(&job_context, &snapshot_seqs, + &earliest_write_conflict_snapshot, &snapshot_checker); + + // Grab current memtable + MemTable* m = cfd->mem(); + SequenceNumber first_seqno = m->GetFirstSequenceNumber(); + SequenceNumber earliest_seqno = m->GetEarliestSequenceNumber(); + + // Create two iterators, one for the memtable data (contains + // info from puts + deletes), and one for the memtable + // Range Tombstones (from DeleteRanges). + ReadOptions ro; + ro.total_order_seek = true; + Arena arena; + std::vector memtables(1, m->NewIterator(ro, &arena)); + std::vector> + range_del_iters; + auto* range_del_iter = m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber); + if (range_del_iter != nullptr) { + range_del_iters.emplace_back(range_del_iter); + } + ScopedArenaIterator iter( + NewMergingIterator(&(cfd->internal_comparator()), memtables.data(), + static_cast(memtables.size()), &arena)); + + auto* ioptions = cfd->ioptions(); + + // Place iterator at the First (meaning most recent) key node. + iter->SeekToFirst(); + + std::unique_ptr range_del_agg( + new CompactionRangeDelAggregator(&(cfd->internal_comparator()), + snapshot_seqs)); + for (auto& rd_iter : range_del_iters) { + range_del_agg->AddTombstones(std::move(rd_iter)); + } + + // If there is valid data in the memtable, + // or at least range tombstones, copy over the info + // to the new memtable. + if (iter->Valid() || !range_del_agg->IsEmpty()) { + std::unique_ptr compaction_filter; + if (ioptions->compaction_filter_factory != nullptr && + ioptions->compaction_filter_factory->ShouldFilterTableFileCreation( + TableFileCreationReason::kFlush)) { + CompactionFilter::Context ctx; + ctx.is_full_compaction = false; + ctx.is_manual_compaction = false; + ctx.column_family_id = cfd->GetID(); + ctx.reason = TableFileCreationReason::kFlush; + compaction_filter = + ioptions->compaction_filter_factory->CreateCompactionFilter(ctx); + if (compaction_filter != nullptr && + !compaction_filter->IgnoreSnapshots()) { + s = Status::NotSupported( + "CompactionFilter::IgnoreSnapshots() = false is not supported " + "anymore."); + return s; + } + } + + Env* env = immutable_db_options_.env; + assert(env); + MergeHelper merge( + env, (cfd->internal_comparator()).user_comparator(), + (ioptions->merge_operator).get(), compaction_filter.get(), + ioptions->logger, true /* internal key corruption is not ok */, + snapshot_seqs.empty() ? 0 : snapshot_seqs.back(), snapshot_checker); + CompactionIterator c_iter( + iter.get(), (cfd->internal_comparator()).user_comparator(), &merge, + kMaxSequenceNumber, &snapshot_seqs, earliest_write_conflict_snapshot, + snapshot_checker, env, ShouldReportDetailedTime(env, ioptions->stats), + true /* internal key corruption is not ok */, range_del_agg.get(), + nullptr, ioptions->allow_data_in_errors, + /*compaction=*/nullptr, compaction_filter.get(), + /*shutting_down=*/nullptr, + /*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr, + /*manual_compaction_canceled=*/nullptr, immutable_db_options_.info_log, + &(cfd->GetFullHistoryTsLow())); + + c_iter.SeekToFirst(); + + mutex_.AssertHeld(); + + // Set earliest sequence number in the new memtable + // to be equal to the earliest sequence number of the + // memtable being flushed (See later if there is a need + // to update this number!). + new_mem->SetEarliestSequenceNumber(earliest_seqno); + // Likewise for first seq number. + new_mem->SetFirstSequenceNumber(first_seqno); + SequenceNumber new_first_seqno = kMaxSequenceNumber; + + // Key transfer + for (; c_iter.Valid(); c_iter.Next()) { + const ParsedInternalKey ikey = c_iter.ikey(); + const Slice value = c_iter.value(); + new_first_seqno = + ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno; + + // Should we update "OldestKeyTime" ???? + s = new_mem->Add( + ikey.sequence, ikey.type, ikey.user_key, value, + nullptr, // KV protection info set as nullptr since it + // should only be useful for the first add to + // the original memtable. + false, // : allow concurrent_memtable_writes_ + // Not seen as necessary for now. + nullptr, // get_post_process_info(m) must be nullptr + // when concurrent_memtable_writes is switched off. + nullptr); // hint, only used when concurrent_memtable_writes_ + // is switched on. + if (!s.ok()) { + break; + } + } + + // Check status and propagate + // potential error status from c_iter + if (!s.ok()) { + c_iter.status().PermitUncheckedError(); + } else if (!c_iter.status().ok()) { + s = c_iter.status(); + } + + // Range tombstone transfer. + if (s.ok()) { + auto range_del_it = range_del_agg->NewIterator(); + for (range_del_it->SeekToFirst(); range_del_it->Valid(); + range_del_it->Next()) { + auto tombstone = range_del_it->Tombstone(); + new_first_seqno = + tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno; + s = new_mem->Add( + tombstone.seq_, // Sequence number + kTypeRangeDeletion, // KV type + tombstone.start_key_, // Key is start key. + tombstone.end_key_, // Value is end key. + nullptr, // KV protection info set as nullptr since it + // should only be useful for the first add to + // the original memtable. + false, // : allow concurrent_memtable_writes_ + // Not seen as necessary for now. + nullptr, // get_post_process_info(m) must be nullptr + // when concurrent_memtable_writes is switched off. + nullptr); // hint, only used when concurrent_memtable_writes_ + // is switched on. + + if (!s.ok()) { + break; + } + } + } + // Rectify the first sequence number, which (unlike the earliest seq + // number) needs to be present in the new memtable. + new_mem->SetFirstSequenceNumber(new_first_seqno); + } + // Note: if the mempurge was ineffective, meaning that there was no + // garbage to remove, and this new_mem needs to be flushed again, + // the new_mem->Add would have updated the flush status when it + // called "UpdateFlushState()" internally at the last Add() call. + // Therefore if the new mem needs to be flushed again, we mark + // the return status as "aborted", which will trigger the regular + // flush operation. + if (s.ok() && new_mem->ShouldScheduleFlush()) { + s = Status::Aborted(Slice("No garbage collected.")); + } + return s; +} + // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue // REQUIRES: this thread is currently at the front of the 2nd writer queue if @@ -1911,6 +2089,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { for (auto cf : empty_cfs) { if (cf->IsEmpty()) { cf->SetLogNumber(logfile_number_); + // MEMPURGE: No need to change this, because new adds + // should still receive new sequence numbers. cf->mem()->SetCreationSeq(versions_->LastSequence()); } // cf may become non-empty. } @@ -1933,11 +2113,51 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { } cfd->mem()->SetNextLogNumber(logfile_number_); - cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); + + // By default, it is assumed that the 'old' memtable + // will be added to the Imm memtable list and will therefore + // contribute to the Imm memory footprint. + bool noImmMemoryContribution = false; + // If MemPurge activated, purge and delete current memtable. + if (immutable_db_options_.experimental_allow_mempurge && + (new_mem != nullptr) && + ((cfd->GetFlushReason() == FlushReason::kOthers) || + (cfd->GetFlushReason() == FlushReason::kManualFlush))) { + Status mempurge_s = MemPurge(cfd, new_mem); + if (mempurge_s.ok()) { + // If mempurge worked successfully, + // create sync point and decrement current memtable reference. + TEST_SYNC_POINT("DBImpl::MemPurge"); + cfd->mem()->Unref(); + // If the MemPurge is successful, the 'old' (purged) memtable + // is not added to the Imm memtable list and therefore + // does not contribute to the Imm memory cost anymore. + noImmMemoryContribution = true; + } else { + // If mempurge failed, go back to regular mem->imm->flush workflow. + if (new_mem) { + delete new_mem; + } + SuperVersion* new_superversion = + context->superversion_context.new_superversion.release(); + if (new_superversion != nullptr) { + delete new_superversion; + } + SequenceNumber seq = versions_->LastSequence(); + new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); + context->superversion_context.NewSuperVersion(); + cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); + } + } else { + // Else make the memtable immutable and proceed as usual. + cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); + } new_mem->Ref(); cfd->SetMemtable(new_mem); InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context, - mutable_cf_options); + mutable_cf_options, + noImmMemoryContribution); + #ifndef ROCKSDB_LITE mutex_.Unlock(); // Notify client that memtable is sealed, now that we have successfully diff --git a/db/flush_job.cc b/db/flush_job.cc index 10d6ed108..409024a08 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -440,6 +440,7 @@ Status FlushJob::WriteLevel0Table() { } } if (tboptions.reason == TableFileCreationReason::kFlush) { + TEST_SYNC_POINT("DBImpl::FlushJob:Flush"); RecordTick(stats_, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH, memtable_payload_bytes); RecordTick(stats_, MEMTABLE_GARBAGE_BYTES_AT_FLUSH, diff --git a/db/memtable.h b/db/memtable.h index 54155f9b5..6f908ae5b 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -341,6 +341,14 @@ class MemTable { return first_seqno_.load(std::memory_order_relaxed); } + // Returns the sequence number of the first element that was inserted + // into the memtable. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + void SetFirstSequenceNumber(SequenceNumber first_seqno) { + return first_seqno_.store(first_seqno, std::memory_order_relaxed); + } + // Returns the sequence number that is guaranteed to be smaller than or equal // to the sequence number of any key that could be inserted into this // memtable. It can then be assumed that any write with a larger(or equal) @@ -352,6 +360,15 @@ class MemTable { return earliest_seqno_.load(std::memory_order_relaxed); } + // Sets the sequence number that is guaranteed to be smaller than or equal + // to the sequence number of any key that could be inserted into this + // memtable. It can then be assumed that any write with a larger(or equal) + // sequence number will be present in this memtable or a later memtable. + // Used only for MemPurge operation + void SetEarliestSequenceNumber(SequenceNumber earliest_seqno) { + return earliest_seqno_.store(earliest_seqno, std::memory_order_relaxed); + } + // DB's latest sequence ID when the memtable is created. This number // may be updated to a more recent one before any key is inserted. SequenceNumber GetCreationSeq() const { return creation_seq_; } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index affbb7aa0..30dd2b95d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -778,6 +778,10 @@ struct DBOptions { // Default: true bool advise_random_on_open = true; + // If true, allows for memtable purge instead of flush to storage. + // (experimental). + bool experimental_allow_mempurge = false; + // Amount of data to build up in memtables across all column // families before writing to disk. // diff --git a/options/db_options.cc b/options/db_options.cc index 1e6113d37..17f96a553 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -192,6 +192,10 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, error_if_exists), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"experimental_allow_mempurge", + {offsetof(struct ImmutableDBOptions, experimental_allow_mempurge), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, {"is_fd_close_on_exec", {offsetof(struct ImmutableDBOptions, is_fd_close_on_exec), OptionType::kBoolean, OptionVerificationType::kNormal, @@ -541,6 +545,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) allow_fallocate(options.allow_fallocate), is_fd_close_on_exec(options.is_fd_close_on_exec), advise_random_on_open(options.advise_random_on_open), + experimental_allow_mempurge(options.experimental_allow_mempurge), db_write_buffer_size(options.db_write_buffer_size), write_buffer_manager(options.write_buffer_manager), access_hint_on_compaction_start(options.access_hint_on_compaction_start), @@ -674,6 +679,9 @@ void ImmutableDBOptions::Dump(Logger* log) const { is_fd_close_on_exec); ROCKS_LOG_HEADER(log, " Options.advise_random_on_open: %d", advise_random_on_open); + ROCKS_LOG_HEADER(log, + " Options.experimental_allow_mempurge: %d", + experimental_allow_mempurge); ROCKS_LOG_HEADER( log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt, db_write_buffer_size); diff --git a/options/db_options.h b/options/db_options.h index edbdbe6a2..7ff90318c 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -54,6 +54,7 @@ struct ImmutableDBOptions { bool allow_fallocate; bool is_fd_close_on_exec; bool advise_random_on_open; + bool experimental_allow_mempurge; size_t db_write_buffer_size; std::shared_ptr write_buffer_manager; DBOptions::AccessHint access_hint_on_compaction_start; diff --git a/options/options_test.cc b/options/options_test.cc index 71921de7c..d3787c475 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -140,6 +140,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"persist_stats_to_disk", "false"}, {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, + {"experimental_allow_mempurge", "false"}, {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, {"compaction_readahead_size", "100"}, @@ -298,6 +299,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); + ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); @@ -1981,6 +1983,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { {"persist_stats_to_disk", "false"}, {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, + {"experimental_allow_mempurge", "false"}, {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, {"compaction_readahead_size", "100"}, @@ -2133,6 +2136,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); + ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);