From 837705ad8011e249d5e96ff5ae98f6e9efa61ecb Mon Sep 17 00:00:00 2001 From: Baptiste Lemaire Date: Fri, 9 Jul 2021 17:16:00 -0700 Subject: [PATCH] Make mempurge a background process (equivalent to in-memory compaction). (#8505) Summary: In https://github.com/facebook/rocksdb/issues/8454, I introduced a new process baptized `MemPurge` (memtable garbage collection). This new PR is built upon this past mempurge prototype. In this PR, I made the `mempurge` process a background task, which provides superior performance since the mempurge process does not cling on the db_mutex anymore, and addresses severe restrictions from the past iteration (including a scenario where the past mempurge was failling, when a memtable was mempurged but was still referred to by an iterator/snapshot/...). Now the mempurge process ressembles an in-memory compaction process: the stack of immutable memtables is filtered out, and the useful payload is used to populate an output memtable. If the output memtable is filled at more than 60% capacity (arbitrary heuristic) the mempurge process is aborted and a regular flush process takes place, else the output memtable is kept in the immutable memtable stack. Note that adding this output memtable to the `imm()` memtable stack does not trigger another flush process, so that the flush thread can go to sleep at the end of a successful mempurge. MemPurge is activated by making the `experimental_allow_mempurge` flag `true`. When activated, the `MemPurge` process will always happen when the flush reason is `kWriteBufferFull`. The 3 unit tests confirm that this process supports `Put`, `Get`, `Delete`, `DeleteRange` operators and is compatible with `Iterators` and `CompactionFilters`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8505 Reviewed By: pdillinger Differential Revision: D29619283 Pulled By: bjlemaire fbshipit-source-id: 8a99bee76b63a8211bff1a00e0ae32360aaece95 --- db/column_family.cc | 21 +- db/column_family.h | 9 +- db/db_flush_test.cc | 255 +++++++++++++++--------- db/db_impl/db_impl.cc | 33 ++++ db/db_impl/db_impl.h | 19 +- db/db_impl/db_impl_compaction_flush.cc | 5 +- db/db_impl/db_impl_write.cc | 219 +-------------------- db/flush_job.cc | 256 ++++++++++++++++++++++++- db/flush_job.h | 22 +++ db/memtable_list.cc | 6 +- db/memtable_list.h | 6 +- 11 files changed, 496 insertions(+), 355 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 178186379..de90f6b6f 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -443,7 +443,7 @@ bool SuperVersion::Unref() { return previous_refs == 1; } -void SuperVersion::Cleanup(const bool noImmMemoryContribution) { +void SuperVersion::Cleanup() { assert(refs.load(std::memory_order_relaxed) == 0); // Since this SuperVersion object is being deleted, // decrement reference to the immutable MemtableList @@ -451,18 +451,9 @@ void SuperVersion::Cleanup(const bool noImmMemoryContribution) { imm->Unref(&to_delete); MemTable* m = mem->Unref(); if (m != nullptr) { - // Typically, if the m memtable was not made - // immutable, and therefore was not added to the - // imm list, it does not contribute to the imm - // memory footprint (and actually is not part of - // the 'imm' MemtableList at all). - // At the moment, noImmMemoryContribution is only - // used by the experimental 'MemPurge' prototype. - if (!noImmMemoryContribution) { - auto* memory_usage = current->cfd()->imm()->current_memory_usage(); - assert(*memory_usage >= m->ApproximateMemoryUsage()); - *memory_usage -= m->ApproximateMemoryUsage(); - } + auto* memory_usage = current->cfd()->imm()->current_memory_usage(); + assert(*memory_usage >= m->ApproximateMemoryUsage()); + *memory_usage -= m->ApproximateMemoryUsage(); to_delete.push_back(m); } current->Unref(); @@ -1272,7 +1263,7 @@ void ColumnFamilyData::InstallSuperVersion( void ColumnFamilyData::InstallSuperVersion( SuperVersionContext* sv_context, InstrumentedMutex* db_mutex, - const MutableCFOptions& mutable_cf_options, bool noImmMemoryContribution) { + const MutableCFOptions& mutable_cf_options) { SuperVersion* new_superversion = sv_context->new_superversion.release(); new_superversion->db_mutex = db_mutex; new_superversion->mutable_cf_options = mutable_cf_options; @@ -1302,7 +1293,7 @@ void ColumnFamilyData::InstallSuperVersion( new_superversion->write_stall_condition, GetName(), ioptions()); } if (old_superversion->Unref()) { - old_superversion->Cleanup(noImmMemoryContribution); + old_superversion->Cleanup(); sv_context->superversions_to_free.push_back(old_superversion); } } diff --git a/db/column_family.h b/db/column_family.h index cfba13171..48fb9abc1 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -222,10 +222,7 @@ struct SuperVersion { // Cleanup unrefs mem, imm and current. Also, it stores all memtables // that needs to be deleted in to_delete vector. Unrefing those // objects needs to be done in the mutex - // The 'noImmMemoryContribution' is set to true if the memtable being - // dereferenced in this SuperVersion was not added to the Immutable - // memtable list. - void Cleanup(bool noImmMemoryContribution = false); + void Cleanup(); void Init(ColumnFamilyData* new_cfd, MemTable* new_mem, MemTableListVersion* new_imm, Version* new_current); @@ -457,8 +454,7 @@ class ColumnFamilyData { // IMPORTANT: Only call this from DBImpl::InstallSuperVersion() void InstallSuperVersion(SuperVersionContext* sv_context, InstrumentedMutex* db_mutex, - const MutableCFOptions& mutable_cf_options, - bool noImmMemoryContribution = false); + const MutableCFOptions& mutable_cf_options); void InstallSuperVersion(SuperVersionContext* sv_context, InstrumentedMutex* db_mutex); @@ -524,6 +520,7 @@ class ColumnFamilyData { } ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); } + WriteBufferManager* write_buffer_mgr() { return write_buffer_manager_; } private: friend class ColumnFamilySet; diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index ef062f510..1e8dac7f1 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -692,16 +692,17 @@ TEST_F(DBFlushTest, MemPurgeBasic) { options.allow_concurrent_memtable_write = true; // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). - options.write_buffer_size = 64 << 20; + options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. options.experimental_allow_mempurge = true; ASSERT_OK(TryReopen(options)); uint32_t mempurge_count = 0; - uint32_t flush_count = 0; + uint32_t sst_count = 0; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::MemPurge", [&](void* /*arg*/) { mempurge_count++; }); + "DBImpl::FlushJob:MemPurgeSuccessful", + [&](void* /*arg*/) { mempurge_count++; }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::FlushJob:Flush", [&](void* /*arg*/) { flush_count++; }); + "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); std::string KEY1 = "IamKey1"; @@ -709,62 +710,120 @@ TEST_F(DBFlushTest, MemPurgeBasic) { std::string KEY3 = "IamKey3"; std::string KEY4 = "IamKey4"; std::string KEY5 = "IamKey5"; - std::string VALUE1 = "IamValue1"; - std::string VALUE2 = "IamValue2"; + std::string KEY6 = "IamKey6"; + std::string KEY7 = "IamKey7"; + std::string KEY8 = "IamKey8"; + std::string KEY9 = "IamKey9"; + std::string RNDKEY1, RNDKEY2, RNDKEY3; const std::string NOT_FOUND = "NOT_FOUND"; - // Check simple operations (put-delete). - ASSERT_OK(Put(KEY1, VALUE1)); - ASSERT_OK(Put(KEY2, VALUE2)); - ASSERT_OK(Delete(KEY1)); - ASSERT_OK(Put(KEY2, VALUE1)); - ASSERT_OK(Put(KEY1, VALUE2)); - ASSERT_OK(Flush()); - - ASSERT_EQ(Get(KEY1), VALUE2); - ASSERT_EQ(Get(KEY2), VALUE1); - - ASSERT_OK(Delete(KEY1)); - ASSERT_EQ(Get(KEY1), NOT_FOUND); - ASSERT_OK(Flush()); - ASSERT_EQ(Get(KEY1), NOT_FOUND); - // Heavy overwrite workload, // more than would fit in maximum allowed memtables. Random rnd(719); - const size_t NUM_REPEAT = 100000; - const size_t RAND_VALUES_LENGTH = 512; - std::string p_v1, p_v2, p_v3, p_v4, p_v5; - // Insertion of of K-V pairs, multiple times. - // Also insert DeleteRange + const size_t NUM_REPEAT = 100; + const size_t RAND_KEYS_LENGTH = 57; + const size_t RAND_VALUES_LENGTH = 10240; + std::string p_v1, p_v2, p_v3, p_v4, p_v5, p_v6, p_v7, p_v8, p_v9, p_rv1, + p_rv2, p_rv3; + + // Insert a very first set of keys that will be + // mempurged at least once. + p_v1 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v2 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v3 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v4 = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEY1, p_v1)); + ASSERT_OK(Put(KEY2, p_v2)); + ASSERT_OK(Put(KEY3, p_v3)); + ASSERT_OK(Put(KEY4, p_v4)); + ASSERT_EQ(Get(KEY1), p_v1); + ASSERT_EQ(Get(KEY2), p_v2); + ASSERT_EQ(Get(KEY3), p_v3); + ASSERT_EQ(Get(KEY4), p_v4); + + // Insertion of of K-V pairs, multiple times (overwrites). for (size_t i = 0; i < NUM_REPEAT; i++) { // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. - p_v1 = rnd.RandomString(RAND_VALUES_LENGTH); - p_v2 = rnd.RandomString(RAND_VALUES_LENGTH); - p_v3 = rnd.RandomString(RAND_VALUES_LENGTH); - p_v4 = rnd.RandomString(RAND_VALUES_LENGTH); p_v5 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v6 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v7 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v8 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v9 = rnd.RandomString(RAND_VALUES_LENGTH); - ASSERT_OK(Put(KEY1, p_v1)); - ASSERT_OK(Put(KEY2, p_v2)); - ASSERT_OK(Put(KEY3, p_v3)); - ASSERT_OK(Put(KEY4, p_v4)); ASSERT_OK(Put(KEY5, p_v5)); + ASSERT_OK(Put(KEY6, p_v6)); + ASSERT_OK(Put(KEY7, p_v7)); + ASSERT_OK(Put(KEY8, p_v8)); + ASSERT_OK(Put(KEY9, p_v9)); ASSERT_EQ(Get(KEY1), p_v1); ASSERT_EQ(Get(KEY2), p_v2); ASSERT_EQ(Get(KEY3), p_v3); ASSERT_EQ(Get(KEY4), p_v4); ASSERT_EQ(Get(KEY5), p_v5); + ASSERT_EQ(Get(KEY6), p_v6); + ASSERT_EQ(Get(KEY7), p_v7); + ASSERT_EQ(Get(KEY8), p_v8); + ASSERT_EQ(Get(KEY9), p_v9); } // Check that there was at least one mempurge const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1; - // Check that there was no flush to storage. - const uint32_t EXPECTED_FLUSH_COUNT = 0; + // Check that there was no SST files created during flush. + const uint32_t EXPECTED_SST_COUNT = 0; EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); - EXPECT_EQ(flush_count, EXPECTED_FLUSH_COUNT); + EXPECT_EQ(sst_count, EXPECTED_SST_COUNT); + + const uint32_t mempurge_count_record = mempurge_count; + + // Insertion of of K-V pairs, no overwrites. + for (size_t i = 0; i < NUM_REPEAT; i++) { + // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. + RNDKEY1 = rnd.RandomString(RAND_KEYS_LENGTH); + RNDKEY2 = rnd.RandomString(RAND_KEYS_LENGTH); + RNDKEY3 = rnd.RandomString(RAND_KEYS_LENGTH); + p_rv1 = rnd.RandomString(RAND_VALUES_LENGTH); + p_rv2 = rnd.RandomString(RAND_VALUES_LENGTH); + p_rv3 = rnd.RandomString(RAND_VALUES_LENGTH); + + ASSERT_OK(Put(RNDKEY1, p_rv1)); + ASSERT_OK(Put(RNDKEY2, p_rv2)); + ASSERT_OK(Put(RNDKEY3, p_rv3)); + + ASSERT_EQ(Get(KEY1), p_v1); + ASSERT_EQ(Get(KEY2), p_v2); + ASSERT_EQ(Get(KEY3), p_v3); + ASSERT_EQ(Get(KEY4), p_v4); + ASSERT_EQ(Get(KEY5), p_v5); + ASSERT_EQ(Get(KEY6), p_v6); + ASSERT_EQ(Get(KEY7), p_v7); + ASSERT_EQ(Get(KEY8), p_v8); + ASSERT_EQ(Get(KEY9), p_v9); + ASSERT_EQ(Get(RNDKEY1), p_rv1); + ASSERT_EQ(Get(RNDKEY2), p_rv2); + ASSERT_EQ(Get(RNDKEY3), p_rv3); + } + + // Assert that at least one flush to storage has been performed + ASSERT_GT(sst_count, EXPECTED_SST_COUNT); + // (which will consequently increase the number of mempurges recorded too). + ASSERT_EQ(mempurge_count, mempurge_count_record); + + // Assert that there is no data corruption, even with + // a flush to storage. + ASSERT_EQ(Get(KEY1), p_v1); + ASSERT_EQ(Get(KEY2), p_v2); + ASSERT_EQ(Get(KEY3), p_v3); + ASSERT_EQ(Get(KEY4), p_v4); + ASSERT_EQ(Get(KEY5), p_v5); + ASSERT_EQ(Get(KEY6), p_v6); + ASSERT_EQ(Get(KEY7), p_v7); + ASSERT_EQ(Get(KEY8), p_v8); + ASSERT_EQ(Get(KEY9), p_v9); + ASSERT_EQ(Get(RNDKEY1), p_rv1); + ASSERT_EQ(Get(RNDKEY2), p_rv2); + ASSERT_EQ(Get(RNDKEY3), p_rv3); Close(); } @@ -780,17 +839,18 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { options.allow_concurrent_memtable_write = true; // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). - options.write_buffer_size = 64 << 20; + options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. options.experimental_allow_mempurge = true; ASSERT_OK(TryReopen(options)); uint32_t mempurge_count = 0; - uint32_t flush_count = 0; + uint32_t sst_count = 0; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::MemPurge", [&](void* /*arg*/) { mempurge_count++; }); + "DBImpl::FlushJob:MemPurgeSuccessful", + [&](void* /*arg*/) { mempurge_count++; }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::FlushJob:Flush", [&](void* /*arg*/) { flush_count++; }); + "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); std::string KEY1 = "ThisIsKey1"; @@ -801,9 +861,9 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { const std::string NOT_FOUND = "NOT_FOUND"; Random rnd(117); - const size_t NUM_REPEAT = 200; - const size_t RAND_VALUES_LENGTH = 512; - bool atLeastOneFlush = false; + const size_t NUM_REPEAT = 100; + const size_t RAND_VALUES_LENGTH = 10240; + std::string key, value, p_v1, p_v2, p_v3, p_v3b, p_v4, p_v5; int count = 0; const int EXPECTED_COUNT_FORLOOP = 3; @@ -813,6 +873,7 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { ropt.pin_data = true; ropt.total_order_seek = true; Iterator* iter = nullptr; + // Insertion of of K-V pairs, multiple times. // Also insert DeleteRange for (size_t i = 0; i < NUM_REPEAT; i++) { @@ -836,12 +897,6 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { KEY3)); ASSERT_OK(Delete(KEY1)); - // Flush (MemPurge) with a probability of 50%. - if (rnd.OneIn(2)) { - ASSERT_OK(Flush()); - atLeastOneFlush = true; - } - ASSERT_EQ(Get(KEY1), NOT_FOUND); ASSERT_EQ(Get(KEY2), NOT_FOUND); ASSERT_EQ(Get(KEY3), p_v3b); @@ -875,19 +930,11 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { // Check that there was at least one mempurge const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1; - // Check that there was no flush to storage. - const uint32_t EXPECTED_FLUSH_COUNT = 0; + // Check that there was no SST files created during flush. + const uint32_t EXPECTED_SST_COUNT = 0; - if (atLeastOneFlush) { - EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); - } else { - // Note that there isn't enough values added to - // automatically trigger a flush/MemPurge in the background. - // Therefore we can make the assumption that if we never - // called "Flush()", no mempurge happened. - EXPECT_EQ(mempurge_count, EXPECTED_FLUSH_COUNT); - } - EXPECT_EQ(flush_count, EXPECTED_FLUSH_COUNT); + EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); + EXPECT_EQ(sst_count, EXPECTED_SST_COUNT); // Additional test for the iterator+memPurge. ASSERT_OK(Put(KEY2, p_v2)); @@ -911,6 +958,7 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { ASSERT_EQ(value, NOT_FOUND); count++; } + // Expected count here is 4: KEY2, KEY3, KEY4, KEY5. ASSERT_EQ(count, EXPECTED_COUNT_END); if (iter) delete iter; @@ -974,6 +1022,10 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { std::string KEY3 = "ThisIsKey3"; std::string KEY4 = "ThisIsKey4"; std::string KEY5 = "ThisIsKey5"; + std::string KEY6 = "ThisIsKey6"; + std::string KEY7 = "ThisIsKey7"; + std::string KEY8 = "ThisIsKey8"; + std::string KEY9 = "ThisIsKey9"; const std::string NOT_FOUND = "NOT_FOUND"; options.statistics = CreateDBStatistics(); @@ -990,43 +1042,68 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { std::make_shared(KEY4); // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). - options.write_buffer_size = 64 << 20; + options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. options.experimental_allow_mempurge = true; ASSERT_OK(TryReopen(options)); + uint32_t mempurge_count = 0; + uint32_t sst_count = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:MemPurgeSuccessful", + [&](void* /*arg*/) { mempurge_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + Random rnd(53); - const size_t NUM_REPEAT = 25; - const size_t RAND_VALUES_LENGTH = 128; - std::string p_v1, p_v2, p_v3, p_v4, p_v5; + const size_t NUM_REPEAT = 1000; + const size_t RAND_VALUES_LENGTH = 10240; + std::string p_v1, p_v2, p_v3, p_v4, p_v5, p_v6, p_v7, p_v8, p_v9; + + p_v1 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v2 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v3 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v4 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v5 = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEY1, p_v1)); + ASSERT_OK(Put(KEY2, p_v2)); + ASSERT_OK(Put(KEY3, p_v3)); + ASSERT_OK(Put(KEY4, p_v4)); + ASSERT_OK(Put(KEY5, p_v5)); + ASSERT_OK(Delete(KEY1)); // Insertion of of K-V pairs, multiple times. - // Also insert DeleteRange for (size_t i = 0; i < NUM_REPEAT; i++) { - // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. - p_v1 = rnd.RandomString(RAND_VALUES_LENGTH); - p_v2 = rnd.RandomString(RAND_VALUES_LENGTH); - p_v3 = rnd.RandomString(RAND_VALUES_LENGTH); - p_v4 = rnd.RandomString(RAND_VALUES_LENGTH); - p_v5 = rnd.RandomString(RAND_VALUES_LENGTH); - ASSERT_OK(Put(KEY1, p_v1)); - ASSERT_OK(Put(KEY2, p_v2)); - ASSERT_OK(Put(KEY3, p_v3)); - ASSERT_OK(Put(KEY4, p_v4)); - ASSERT_OK(Put(KEY5, p_v5)); + // Create value strings of arbitrary + // length RAND_VALUES_LENGTH bytes. + p_v6 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v7 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v8 = rnd.RandomString(RAND_VALUES_LENGTH); + p_v9 = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEY6, p_v6)); + ASSERT_OK(Put(KEY7, p_v7)); + ASSERT_OK(Put(KEY8, p_v8)); + ASSERT_OK(Put(KEY9, p_v9)); + + ASSERT_OK(Delete(KEY7)); + } - ASSERT_OK(Delete(KEY1)); + // Check that there was at least one mempurge + const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1; + // Check that there was no SST files created during flush. + const uint32_t EXPECTED_SST_COUNT = 0; - ASSERT_OK(Flush()); + EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); + EXPECT_EQ(sst_count, EXPECTED_SST_COUNT); - // Verify that the ConditionalUpdateCompactionFilter - // updated the values of KEY2 and KEY3, and not KEY4 and KEY5. - ASSERT_EQ(Get(KEY1), NOT_FOUND); - ASSERT_EQ(Get(KEY2), NEW_VALUE); - ASSERT_EQ(Get(KEY3), NEW_VALUE); - ASSERT_EQ(Get(KEY4), p_v4); - ASSERT_EQ(Get(KEY5), p_v5); - } + // Verify that the ConditionalUpdateCompactionFilter + // updated the values of KEY2 and KEY3, and not KEY4 and KEY5. + ASSERT_EQ(Get(KEY1), NOT_FOUND); + ASSERT_EQ(Get(KEY2), NEW_VALUE); + ASSERT_EQ(Get(KEY3), NEW_VALUE); + ASSERT_EQ(Get(KEY4), p_v4); + ASSERT_EQ(Get(KEY5), p_v5); } TEST_P(DBFlushDirectIOTest, DirectIO) { diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 1e0663bc7..1b3742201 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -548,12 +548,45 @@ Status DBImpl::CloseHelper() { flush_scheduler_.Clear(); trim_history_scheduler_.Clear(); + // For now, simply trigger a manual flush at close time + // on all the column families. + // TODO(bjlemaire): Check if this is needed. Also, in the + // future we can contemplate doing a more fine-grained + // flushing by first checking if there is a need for + // flushing (but need to implement something + // else than imm()->IsFlushPending() because the output + // memtables added to imm() dont trigger flushes). + if (immutable_db_options_.experimental_allow_mempurge) { + Status flush_ret; + mutex_.Unlock(); + for (ColumnFamilyData* cf : *versions_->GetColumnFamilySet()) { + if (immutable_db_options_.atomic_flush) { + flush_ret = AtomicFlushMemTables({cf}, FlushOptions(), + FlushReason::kManualFlush); + if (!flush_ret.ok()) { + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "Atomic flush memtables failed upon closing (mempurge)."); + } + } else { + flush_ret = + FlushMemTable(cf, FlushOptions(), FlushReason::kManualFlush); + if (!flush_ret.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Flush memtables failed upon closing (mempurge)."); + } + } + } + mutex_.Lock(); + } + while (!flush_queue_.empty()) { const FlushRequest& flush_req = PopFirstFromFlushQueue(); for (const auto& iter : flush_req) { iter.first->UnrefAndTryDelete(); } } + while (!compaction_queue_.empty()) { auto cfd = PopFirstFromCompactionQueue(); cfd->UnrefAndTryDelete(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 8f28b4af0..b38b6d14f 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1612,23 +1612,6 @@ class DBImpl : public DB { Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); - // Memtable Garbage Collection algorithm: a MemPurge takes the memtable - // and filters (or "purge") the outdated bytes out of it. The output - // (the filtered bytes, or "useful payload") is then transfered into - // the new memtable "new_mem". This process is typically intended for - // workloads with heavy overwrites to save on IO cost resulting from - // expensive flush operations. - // "MemPurge" is an experimental feature still at a very early stage - // of development. At the moment it is only compatible with the Get, Put, - // Delete operations as well as Iterators and CompactionFilters. - // For this early version, "MemPurge" is called by setting the - // options.experimental_allow_mempurge flag as "true". When this is - // the case, ALL flush operations will be replaced by MemPurge operations. - // (for prototype stress-testing purposes). Therefore, we strongly - // recommend all users not to set this flag as true given that the MemPurge - // process has not matured yet. - Status MemPurge(ColumnFamilyData* cfd, MemTable* new_mem); - void SelectColumnFamiliesForAtomicFlush(autovector* cfds); // Force current memtable contents to be flushed. @@ -1854,7 +1837,7 @@ class DBImpl : public DB { // state needs flush or compaction. void InstallSuperVersionAndScheduleWork( ColumnFamilyData* cfd, SuperVersionContext* sv_context, - const MutableCFOptions& mutable_cf_options, bool fromMemPurge = false); + const MutableCFOptions& mutable_cf_options); bool GetIntPropertyInternal(ColumnFamilyData* cfd, const DBPropertyInfo& property_info, diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 69a98c934..ec921227e 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -3436,7 +3436,7 @@ void DBImpl::BuildCompactionJobInfo( void DBImpl::InstallSuperVersionAndScheduleWork( ColumnFamilyData* cfd, SuperVersionContext* sv_context, - const MutableCFOptions& mutable_cf_options, bool fromMemPurge) { + const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); // Update max_total_in_memory_state_ @@ -3451,8 +3451,7 @@ void DBImpl::InstallSuperVersionAndScheduleWork( if (UNLIKELY(sv_context->new_superversion == nullptr)) { sv_context->NewSuperVersion(); } - cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options, - fromMemPurge); + cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options); // There may be a small data race here. The snapshot tricking bottommost // compaction may already be released here. But assuming there will always be diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index bbed637c1..6dcef6208 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1737,184 +1737,6 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/, } #endif // ROCKSDB_LITE -Status DBImpl::MemPurge(ColumnFamilyData* cfd, MemTable* new_mem) { - Status s; - assert(new_mem != nullptr); - - JobContext job_context(next_job_id_.fetch_add(1), true); - std::vector snapshot_seqs; - SequenceNumber earliest_write_conflict_snapshot; - SnapshotChecker* snapshot_checker; - GetSnapshotContext(&job_context, &snapshot_seqs, - &earliest_write_conflict_snapshot, &snapshot_checker); - - // Grab current memtable - MemTable* m = cfd->mem(); - SequenceNumber first_seqno = m->GetFirstSequenceNumber(); - SequenceNumber earliest_seqno = m->GetEarliestSequenceNumber(); - - // Create two iterators, one for the memtable data (contains - // info from puts + deletes), and one for the memtable - // Range Tombstones (from DeleteRanges). - ReadOptions ro; - ro.total_order_seek = true; - Arena arena; - std::vector memtables(1, m->NewIterator(ro, &arena)); - std::vector> - range_del_iters; - auto* range_del_iter = m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber); - if (range_del_iter != nullptr) { - range_del_iters.emplace_back(range_del_iter); - } - ScopedArenaIterator iter( - NewMergingIterator(&(cfd->internal_comparator()), memtables.data(), - static_cast(memtables.size()), &arena)); - - auto* ioptions = cfd->ioptions(); - - // Place iterator at the First (meaning most recent) key node. - iter->SeekToFirst(); - - std::unique_ptr range_del_agg( - new CompactionRangeDelAggregator(&(cfd->internal_comparator()), - snapshot_seqs)); - for (auto& rd_iter : range_del_iters) { - range_del_agg->AddTombstones(std::move(rd_iter)); - } - - // If there is valid data in the memtable, - // or at least range tombstones, copy over the info - // to the new memtable. - if (iter->Valid() || !range_del_agg->IsEmpty()) { - std::unique_ptr compaction_filter; - if (ioptions->compaction_filter_factory != nullptr && - ioptions->compaction_filter_factory->ShouldFilterTableFileCreation( - TableFileCreationReason::kFlush)) { - CompactionFilter::Context ctx; - ctx.is_full_compaction = false; - ctx.is_manual_compaction = false; - ctx.column_family_id = cfd->GetID(); - ctx.reason = TableFileCreationReason::kFlush; - compaction_filter = - ioptions->compaction_filter_factory->CreateCompactionFilter(ctx); - if (compaction_filter != nullptr && - !compaction_filter->IgnoreSnapshots()) { - s = Status::NotSupported( - "CompactionFilter::IgnoreSnapshots() = false is not supported " - "anymore."); - return s; - } - } - - Env* env = immutable_db_options_.env; - assert(env); - MergeHelper merge( - env, (cfd->internal_comparator()).user_comparator(), - (ioptions->merge_operator).get(), compaction_filter.get(), - ioptions->logger, true /* internal key corruption is not ok */, - snapshot_seqs.empty() ? 0 : snapshot_seqs.back(), snapshot_checker); - CompactionIterator c_iter( - iter.get(), (cfd->internal_comparator()).user_comparator(), &merge, - kMaxSequenceNumber, &snapshot_seqs, earliest_write_conflict_snapshot, - snapshot_checker, env, ShouldReportDetailedTime(env, ioptions->stats), - true /* internal key corruption is not ok */, range_del_agg.get(), - nullptr, ioptions->allow_data_in_errors, - /*compaction=*/nullptr, compaction_filter.get(), - /*shutting_down=*/nullptr, - /*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr, - /*manual_compaction_canceled=*/nullptr, immutable_db_options_.info_log, - &(cfd->GetFullHistoryTsLow())); - - c_iter.SeekToFirst(); - - mutex_.AssertHeld(); - - // Set earliest sequence number in the new memtable - // to be equal to the earliest sequence number of the - // memtable being flushed (See later if there is a need - // to update this number!). - new_mem->SetEarliestSequenceNumber(earliest_seqno); - // Likewise for first seq number. - new_mem->SetFirstSequenceNumber(first_seqno); - SequenceNumber new_first_seqno = kMaxSequenceNumber; - - // Key transfer - for (; c_iter.Valid(); c_iter.Next()) { - const ParsedInternalKey ikey = c_iter.ikey(); - const Slice value = c_iter.value(); - new_first_seqno = - ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno; - - // Should we update "OldestKeyTime" ???? - s = new_mem->Add( - ikey.sequence, ikey.type, ikey.user_key, value, - nullptr, // KV protection info set as nullptr since it - // should only be useful for the first add to - // the original memtable. - false, // : allow concurrent_memtable_writes_ - // Not seen as necessary for now. - nullptr, // get_post_process_info(m) must be nullptr - // when concurrent_memtable_writes is switched off. - nullptr); // hint, only used when concurrent_memtable_writes_ - // is switched on. - if (!s.ok()) { - break; - } - } - - // Check status and propagate - // potential error status from c_iter - if (!s.ok()) { - c_iter.status().PermitUncheckedError(); - } else if (!c_iter.status().ok()) { - s = c_iter.status(); - } - - // Range tombstone transfer. - if (s.ok()) { - auto range_del_it = range_del_agg->NewIterator(); - for (range_del_it->SeekToFirst(); range_del_it->Valid(); - range_del_it->Next()) { - auto tombstone = range_del_it->Tombstone(); - new_first_seqno = - tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno; - s = new_mem->Add( - tombstone.seq_, // Sequence number - kTypeRangeDeletion, // KV type - tombstone.start_key_, // Key is start key. - tombstone.end_key_, // Value is end key. - nullptr, // KV protection info set as nullptr since it - // should only be useful for the first add to - // the original memtable. - false, // : allow concurrent_memtable_writes_ - // Not seen as necessary for now. - nullptr, // get_post_process_info(m) must be nullptr - // when concurrent_memtable_writes is switched off. - nullptr); // hint, only used when concurrent_memtable_writes_ - // is switched on. - - if (!s.ok()) { - break; - } - } - } - // Rectify the first sequence number, which (unlike the earliest seq - // number) needs to be present in the new memtable. - new_mem->SetFirstSequenceNumber(new_first_seqno); - } - // Note: if the mempurge was ineffective, meaning that there was no - // garbage to remove, and this new_mem needs to be flushed again, - // the new_mem->Add would have updated the flush status when it - // called "UpdateFlushState()" internally at the last Add() call. - // Therefore if the new mem needs to be flushed again, we mark - // the return status as "aborted", which will trigger the regular - // flush operation. - if (s.ok() && new_mem->ShouldScheduleFlush()) { - s = Status::Aborted(Slice("No garbage collected.")); - } - return s; -} - // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue // REQUIRES: this thread is currently at the front of the 2nd writer queue if @@ -2114,48 +1936,11 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { cfd->mem()->SetNextLogNumber(logfile_number_); assert(new_mem != nullptr); - // By default, it is assumed that the 'old' memtable - // will be added to the Imm memtable list and will therefore - // contribute to the Imm memory footprint. - bool noImmMemoryContribution = false; - // If MemPurge activated, purge and delete current memtable. - if (immutable_db_options_.experimental_allow_mempurge && - ((cfd->GetFlushReason() == FlushReason::kOthers) || - (cfd->GetFlushReason() == FlushReason::kManualFlush))) { - Status mempurge_s = MemPurge(cfd, new_mem); - if (mempurge_s.ok()) { - // If mempurge worked successfully, - // create sync point and decrement current memtable reference. - TEST_SYNC_POINT("DBImpl::MemPurge"); - cfd->mem()->Unref(); - // If the MemPurge is successful, the 'old' (purged) memtable - // is not added to the Imm memtable list and therefore - // does not contribute to the Imm memory cost anymore. - noImmMemoryContribution = true; - } else { - // If mempurge failed, go back to regular mem->imm->flush workflow. - assert(new_mem != nullptr); - delete new_mem; - SuperVersion* new_superversion = - context->superversion_context.new_superversion.release(); - if (new_superversion != nullptr) { - delete new_superversion; - } - SequenceNumber seq = versions_->LastSequence(); - new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); - assert(new_mem != nullptr); - context->superversion_context.NewSuperVersion(); - cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); - } - } else { - // Else make the memtable immutable and proceed as usual. - cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); - } + cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); new_mem->Ref(); cfd->SetMemtable(new_mem); InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context, - mutable_cf_options, - noImmMemoryContribution); + mutable_cf_options); #ifndef ROCKSDB_LITE mutex_.Unlock(); diff --git a/db/flush_job.cc b/db/flush_job.cc index 409024a08..2cc306fbd 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -227,9 +227,25 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, prev_cpu_write_nanos = IOSTATS(cpu_write_nanos); prev_cpu_read_nanos = IOSTATS(cpu_read_nanos); } - - // This will release and re-acquire the mutex. - Status s = WriteLevel0Table(); + Status mempurge_s = Status::NotFound("No MemPurge."); + if (db_options_.experimental_allow_mempurge && + (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) && + (!mems_.empty())) { + mempurge_s = MemPurge(); + if (!mempurge_s.ok()) { + ROCKS_LOG_INFO(db_options_.info_log, + "Mempurge process unsuccessful: %s\n", + mempurge_s.ToString().c_str()); + } + } + Status s; + if (mempurge_s.ok()) { + base_->Unref(); + s = Status::OK(); + } else { + // This will release and re-acquire the mutex. + s = WriteLevel0Table(); + } if (s.ok() && cfd_->IsDropped()) { s = Status::ColumnFamilyDropped("Column family dropped during compaction"); @@ -306,6 +322,237 @@ void FlushJob::Cancel() { base_->Unref(); } +Status FlushJob::MemPurge() { + Status s; + db_mutex_->AssertHeld(); + db_mutex_->Unlock(); + assert(!mems_.empty()); + + MemTable* new_mem = nullptr; + + // Create two iterators, one for the memtable data (contains + // info from puts + deletes), and one for the memtable + // Range Tombstones (from DeleteRanges). + ReadOptions ro; + ro.total_order_seek = true; + Arena arena; + std::vector memtables; + std::vector> + range_del_iters; + for (MemTable* m : mems_) { + memtables.push_back(m->NewIterator(ro, &arena)); + auto* range_del_iter = m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber); + if (range_del_iter != nullptr) { + range_del_iters.emplace_back(range_del_iter); + } + } + + assert(!memtables.empty()); + SequenceNumber first_seqno = mems_[0]->GetFirstSequenceNumber(); + SequenceNumber earliest_seqno = mems_[0]->GetEarliestSequenceNumber(); + ScopedArenaIterator iter( + NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(), + static_cast(memtables.size()), &arena)); + + auto* ioptions = cfd_->ioptions(); + + // Place iterator at the First (meaning most recent) key node. + iter->SeekToFirst(); + + std::unique_ptr range_del_agg( + new CompactionRangeDelAggregator(&(cfd_->internal_comparator()), + existing_snapshots_)); + for (auto& rd_iter : range_del_iters) { + range_del_agg->AddTombstones(std::move(rd_iter)); + } + + // If there is valid data in the memtable, + // or at least range tombstones, copy over the info + // to the new memtable. + if (iter->Valid() || !range_del_agg->IsEmpty()) { + // Arbitrary heuristic: maxSize is 60% cpacity. + size_t maxSize = ((mutable_cf_options_.write_buffer_size + 6U) / 10U); + std::unique_ptr compaction_filter; + if (ioptions->compaction_filter_factory != nullptr && + ioptions->compaction_filter_factory->ShouldFilterTableFileCreation( + TableFileCreationReason::kFlush)) { + CompactionFilter::Context ctx; + ctx.is_full_compaction = false; + ctx.is_manual_compaction = false; + ctx.column_family_id = cfd_->GetID(); + ctx.reason = TableFileCreationReason::kFlush; + compaction_filter = + ioptions->compaction_filter_factory->CreateCompactionFilter(ctx); + if (compaction_filter != nullptr && + !compaction_filter->IgnoreSnapshots()) { + s = Status::NotSupported( + "CompactionFilter::IgnoreSnapshots() = false is not supported " + "anymore."); + return s; + } + } + + // mems are ordered by increasing ID, so mems_[0]->GetID + // returns the smallest memtable ID. + new_mem = + new MemTable((cfd_->internal_comparator()), *(cfd_->ioptions()), + mutable_cf_options_, cfd_->write_buffer_mgr(), + mems_[0]->GetEarliestSequenceNumber(), cfd_->GetID()); + assert(new_mem != nullptr); + + Env* env = db_options_.env; + assert(env); + MergeHelper merge( + env, (cfd_->internal_comparator()).user_comparator(), + (ioptions->merge_operator).get(), compaction_filter.get(), + ioptions->logger, true /* internal key corruption is not ok */, + existing_snapshots_.empty() ? 0 : existing_snapshots_.back(), + snapshot_checker_); + CompactionIterator c_iter( + iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge, + kMaxSequenceNumber, &existing_snapshots_, + earliest_write_conflict_snapshot_, snapshot_checker_, env, + ShouldReportDetailedTime(env, ioptions->stats), + true /* internal key corruption is not ok */, range_del_agg.get(), + nullptr, ioptions->allow_data_in_errors, + /*compaction=*/nullptr, compaction_filter.get(), + /*shutting_down=*/nullptr, + /*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr, + /*manual_compaction_canceled=*/nullptr, ioptions->info_log, + &(cfd_->GetFullHistoryTsLow())); + + // Set earliest sequence number in the new memtable + // to be equal to the earliest sequence number of the + // memtable being flushed (See later if there is a need + // to update this number!). + new_mem->SetEarliestSequenceNumber(earliest_seqno); + // Likewise for first seq number. + new_mem->SetFirstSequenceNumber(first_seqno); + SequenceNumber new_first_seqno = kMaxSequenceNumber; + + c_iter.SeekToFirst(); + + // Key transfer + for (; c_iter.Valid(); c_iter.Next()) { + const ParsedInternalKey ikey = c_iter.ikey(); + const Slice value = c_iter.value(); + new_first_seqno = + ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno; + + // Should we update "OldestKeyTime" ???? -> timestamp appear + // to still be an "experimental" feature. + s = new_mem->Add( + ikey.sequence, ikey.type, ikey.user_key, value, + nullptr, // KV protection info set as nullptr since it + // should only be useful for the first add to + // the original memtable. + false, // : allow concurrent_memtable_writes_ + // Not seen as necessary for now. + nullptr, // get_post_process_info(m) must be nullptr + // when concurrent_memtable_writes is switched off. + nullptr); // hint, only used when concurrent_memtable_writes_ + // is switched on. + if (!s.ok()) { + break; + } + + // If new_mem has size greater than maxSize, + // then rollback to regular flush operation, + // and destroy new_mem. + if (new_mem->ApproximateMemoryUsage() > maxSize) { + s = Status::Aborted("Mempurge filled more than one memtable."); + break; + } + } + + // Check status and propagate + // potential error status from c_iter + if (!s.ok()) { + c_iter.status().PermitUncheckedError(); + } else if (!c_iter.status().ok()) { + s = c_iter.status(); + } + + // Range tombstone transfer. + if (s.ok()) { + auto range_del_it = range_del_agg->NewIterator(); + for (range_del_it->SeekToFirst(); range_del_it->Valid(); + range_del_it->Next()) { + auto tombstone = range_del_it->Tombstone(); + new_first_seqno = + tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno; + s = new_mem->Add( + tombstone.seq_, // Sequence number + kTypeRangeDeletion, // KV type + tombstone.start_key_, // Key is start key. + tombstone.end_key_, // Value is end key. + nullptr, // KV protection info set as nullptr since it + // should only be useful for the first add to + // the original memtable. + false, // : allow concurrent_memtable_writes_ + // Not seen as necessary for now. + nullptr, // get_post_process_info(m) must be nullptr + // when concurrent_memtable_writes is switched off. + nullptr); // hint, only used when concurrent_memtable_writes_ + // is switched on. + + if (!s.ok()) { + break; + } + + // If new_mem has size greater than maxSize, + // then rollback to regular flush operation, + // and destroy new_mem. + if (new_mem->ApproximateMemoryUsage() > maxSize) { + s = Status::Aborted(Slice("Mempurge filled more than one memtable.")); + break; + } + } + } + + // If everything happened smoothly and new_mem contains valid data, + // decide if it is flushed to storage or kept in the imm() + // memtable list (memory). + if (s.ok() && (new_first_seqno != kMaxSequenceNumber)) { + // Rectify the first sequence number, which (unlike the earliest seq + // number) needs to be present in the new memtable. + new_mem->SetFirstSequenceNumber(new_first_seqno); + + // The new_mem is added to the list of immutable memtables + // only if it filled at less than 60% capacity (arbitrary heuristic). + if (new_mem->ApproximateMemoryUsage() < maxSize) { + db_mutex_->Lock(); + cfd_->imm() + ->Add(new_mem, &job_context_->memtables_to_free, false /* trigger_flush. Adding this memtable will not trigger any flush */); + new_mem->Ref(); + db_mutex_->Unlock(); + } else { + s = Status::Aborted(Slice("Mempurge filled more than one memtable.")); + if (new_mem) { + job_context_->memtables_to_free.push_back(new_mem); + } + } + } else { + // In this case, the newly allocated new_mem is empty. + assert(new_mem != nullptr); + job_context_->memtables_to_free.push_back(new_mem); + } + } + + // Reacquire the mutex for WriteLevel0 function. + db_mutex_->Lock(); + + // If mempurge successful, don't write input tables to level0, + // but write any full output table to level0. + if (s.ok()) { + TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeSuccessful"); + } else { + TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeUnsuccessful"); + } + + return s; +} + Status FlushJob::WriteLevel0Table() { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_FLUSH_WRITE_L0); @@ -362,7 +609,7 @@ Status FlushJob::WriteLevel0Table() { { ScopedArenaIterator iter( - NewMergingIterator(&cfd_->internal_comparator(), &memtables[0], + NewMergingIterator(&cfd_->internal_comparator(), memtables.data(), static_cast(memtables.size()), &arena)); ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started", @@ -470,6 +717,7 @@ Status FlushJob::WriteLevel0Table() { const bool has_output = meta_.fd.GetFileSize() > 0; if (s.ok() && has_output) { + TEST_SYNC_POINT("DBImpl::FlushJob:SSTFileCreated"); // if we have more than 1 background thread, then we cannot // insert files directly into higher levels because some other // threads could be concurrently producing compacted files for diff --git a/db/flush_job.h b/db/flush_job.h index ff2ad85bc..b0a6bf2de 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -101,6 +101,28 @@ class FlushJob { void ReportFlushInputSize(const autovector& mems); void RecordFlushIOStats(); Status WriteLevel0Table(); + + // Memtable Garbage Collection algorithm: a MemPurge takes the list + // of immutable memtables and filters out (or "purge") the outdated bytes + // out of it. The output (the filtered bytes, or "useful payload") is + // then transfered into a new memtable. If this memtable is filled, then + // the mempurge is aborted and rerouted to a regular flush process. Else, + // depending on the heuristics, placed onto the immutable memtable list. + // The addition to the imm list will not trigger a flush operation. The + // flush of the imm list will instead be triggered once the mutable memtable + // is added to the imm list. + // This process is typically intended for workloads with heavy overwrites + // when we want to avoid SSD writes (and reads) as much as possible. + // "MemPurge" is an experimental feature still at a very early stage + // of development. At the moment it is only compatible with the Get, Put, + // Delete operations as well as Iterators and CompactionFilters. + // For this early version, "MemPurge" is called by setting the + // options.experimental_allow_mempurge flag as "true". When this is + // the case, ALL automatic flush operations (kWRiteBufferManagerFull) will + // first go through the MemPurge process. herefore, we strongly + // recommend all users not to set this flag as true given that the MemPurge + // process has not matured yet. + Status MemPurge(); #ifndef ROCKSDB_LITE std::unique_ptr GetFlushJobInfo() const; #endif // !ROCKSDB_LITE diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 97d076b03..94cb9497d 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -516,7 +516,8 @@ Status MemTableList::TryInstallMemtableFlushResults( } // New memtables are inserted at the front of the list. -void MemTableList::Add(MemTable* m, autovector* to_delete) { +void MemTableList::Add(MemTable* m, autovector* to_delete, + bool trigger_flush) { assert(static_cast(current_->memlist_.size()) >= num_flush_not_started_); InstallNewVersion(); // this method is used to move mutable memtable into an immutable list. @@ -527,7 +528,8 @@ void MemTableList::Add(MemTable* m, autovector* to_delete) { current_->Add(m, to_delete); m->MarkImmutable(); num_flush_not_started_++; - if (num_flush_not_started_ == 1) { + + if (num_flush_not_started_ > 0 && trigger_flush) { imm_flush_needed.store(true, std::memory_order_release); } UpdateCachedValuesFromMemTableListVersion(); diff --git a/db/memtable_list.h b/db/memtable_list.h index 493a54d40..fde5e8a90 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -272,7 +272,11 @@ class MemTableList { // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). - void Add(MemTable* m, autovector* to_delete); + // By default, adding memtables will flag that the memtable list needs to be + // flushed, but in certain situations, like after a mempurge, we may want to + // avoid flushing the memtable list upon addition of a memtable. + void Add(MemTable* m, autovector* to_delete, + bool trigger_flush = true); // Returns an estimate of the number of bytes of data in use. size_t ApproximateMemoryUsage();