From 206845c05740aabbc548fc664239a876e9114dd0 Mon Sep 17 00:00:00 2001 From: Baptiste Lemaire Date: Thu, 15 Jul 2021 17:48:17 -0700 Subject: [PATCH] Mempurge support for wal (#8528) Summary: In this PR, `mempurge` is made compatible with the Write Ahead Log: in case of recovery, the DB is now capable of recovering the data that was "mempurged" and kept in the `imm()` list of immutable memtables. The twist was to add a uint64_t to the `memtable` struct to store the number of the earliest log file containing entries from the `memtable`. When a `Flush` operation is replaced with a `MemPurge`, the `VersionEdit` (which usually contains the new min log file number to pick up for recovery and the level 0 file path of the newly created SST file) is no longer appended to the manifest log, and every time the `deleteWal` method is called, a check is made on the list of immutable memtables. This PR also includes a unit test that verifies that no data is lost upon Reopening of the database when the mempurge feature is activated. This extensive unit test includes two column families, with valid data contained in the imm() at time of "crash"/reopening (recovery). Pull Request resolved: https://github.com/facebook/rocksdb/pull/8528 Reviewed By: pdillinger Differential Revision: D29701097 Pulled By: bjlemaire fbshipit-source-id: 072a900fb6ccc1edcf5eef6caf88f3060238edf9 --- db/column_family.cc | 11 +- db/column_family.h | 5 +- db/db_flush_test.cc | 170 +++++++++++++++++++++++++ db/db_impl/db_impl_compaction_flush.cc | 16 +++ db/db_impl/db_impl_open.cc | 7 +- db/db_impl/db_impl_secondary.cc | 4 +- db/db_impl/db_impl_write.cc | 3 +- db/flush_job.cc | 50 ++++++-- db/flush_job.h | 1 + db/memtable.cc | 4 +- db/memtable.h | 17 ++- db/memtable_list.cc | 49 +++++-- db/memtable_list.h | 19 ++- db/repair.cc | 2 +- db/version_edit_handler.cc | 5 + db/version_set.cc | 10 +- db/version_set.h | 19 +++ 17 files changed, 355 insertions(+), 37 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index de90f6b6f..47389a670 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1059,17 +1059,20 @@ uint64_t ColumnFamilyData::GetLiveSstFilesSize() const { } MemTable* ColumnFamilyData::ConstructNewMemtable( - const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) { + const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq, + uint64_t log_number) { return new MemTable(internal_comparator_, ioptions_, mutable_cf_options, - write_buffer_manager_, earliest_seq, id_); + write_buffer_manager_, earliest_seq, id_, log_number); } void ColumnFamilyData::CreateNewMemtable( - const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) { + const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq, + uint64_t log_number) { if (mem_ != nullptr) { delete mem_->Unref(); } - SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq)); + SetMemtable( + ConstructNewMemtable(mutable_cf_options, earliest_seq, log_number)); mem_->Ref(); } diff --git a/db/column_family.h b/db/column_family.h index 48fb9abc1..d73c3dda9 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -371,9 +371,10 @@ class ColumnFamilyData { // See Memtable constructor for explanation of earliest_seq param. MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options, - SequenceNumber earliest_seq); + SequenceNumber earliest_seq, + uint64_t log_number = 0); void CreateNewMemtable(const MutableCFOptions& mutable_cf_options, - SequenceNumber earliest_seq); + SequenceNumber earliest_seq, uint64_t log_number = 0); TableCache* table_cache() const { return table_cache_.get(); } BlobFileCache* blob_file_cache() const { return blob_file_cache_.get(); } diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 1e8dac7f1..32fa2f17e 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -1106,6 +1106,176 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { ASSERT_EQ(Get(KEY5), p_v5); } +TEST_F(DBFlushTest, MemPurgeWALSupport) { + Options options = CurrentOptions(); + + options.statistics = CreateDBStatistics(); + options.statistics->set_stats_level(StatsLevel::kAll); + options.create_if_missing = true; + options.compression = kNoCompression; + options.inplace_update_support = false; + options.allow_concurrent_memtable_write = true; + + // Enforce size of a single MemTable to 1MB. + options.write_buffer_size = 128 << 10; + // Activate the MemPurge prototype. + options.experimental_allow_mempurge = true; + ASSERT_OK(TryReopen(options)); + + const size_t KVSIZE = 10; + + do { + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_OK(Put(1, "foo", "v1")); + ASSERT_OK(Put(1, "baz", "v5")); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ("v1", Get(1, "foo")); + + ASSERT_EQ("v1", Get(1, "foo")); + ASSERT_EQ("v5", Get(1, "baz")); + ASSERT_OK(Put(0, "bar", "v2")); + ASSERT_OK(Put(1, "bar", "v2")); + ASSERT_OK(Put(1, "foo", "v3")); + uint32_t mempurge_count = 0; + uint32_t sst_count = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:MemPurgeSuccessful", + [&](void* /*arg*/) { mempurge_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector keys; + for (size_t k = 0; k < KVSIZE; k++) { + keys.push_back("IamKey" + std::to_string(k)); + } + + std::string RNDKEY, RNDVALUE; + const std::string NOT_FOUND = "NOT_FOUND"; + + // Heavy overwrite workload, + // more than would fit in maximum allowed memtables. + Random rnd(719); + const size_t NUM_REPEAT = 100; + const size_t RAND_KEY_LENGTH = 8192; + const size_t RAND_VALUES_LENGTH = 1024; + std::vector values_default(KVSIZE), values_pikachu(KVSIZE); + + // Insert a very first set of keys that will be + // mempurged at least once. + for (size_t k = 0; k < KVSIZE / 2; k++) { + values_default[k] = rnd.RandomString(RAND_VALUES_LENGTH); + values_pikachu[k] = rnd.RandomString(RAND_VALUES_LENGTH); + } + + // Insert keys[0:KVSIZE/2] to + // both 'default' and 'pikachu' CFs. + for (size_t k = 0; k < KVSIZE / 2; k++) { + ASSERT_OK(Put(0, keys[k], values_default[k])); + ASSERT_OK(Put(1, keys[k], values_pikachu[k])); + } + + // Check that the insertion was seamless. + for (size_t k = 0; k < KVSIZE / 2; k++) { + ASSERT_EQ(Get(0, keys[k]), values_default[k]); + ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]); + } + + // Insertion of of K-V pairs, multiple times (overwrites) + // into 'default' CF. Will trigger mempurge. + for (size_t j = 0; j < NUM_REPEAT; j++) { + // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. + for (size_t k = KVSIZE / 2; k < KVSIZE; k++) { + values_default[k] = rnd.RandomString(RAND_VALUES_LENGTH); + } + + // Insert K-V into default CF. + for (size_t k = KVSIZE / 2; k < KVSIZE; k++) { + ASSERT_OK(Put(0, keys[k], values_default[k])); + } + + // Check key validity, for all keys, both in + // default and pikachu CFs. + for (size_t k = 0; k < KVSIZE; k++) { + ASSERT_EQ(Get(0, keys[k]), values_default[k]); + } + // Note that at this point, only keys[0:KVSIZE/2] + // have been inserted into Pikachu. + for (size_t k = 0; k < KVSIZE / 2; k++) { + ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]); + } + } + + // Insertion of of K-V pairs, multiple times (overwrites) + // into 'pikachu' CF. Will trigger mempurge. + // Check that we keep the older logs for 'default' imm(). + for (size_t j = 0; j < NUM_REPEAT; j++) { + // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. + for (size_t k = KVSIZE / 2; k < KVSIZE; k++) { + values_pikachu[k] = rnd.RandomString(RAND_VALUES_LENGTH); + } + + // Insert K-V into pikachu CF. + for (size_t k = KVSIZE / 2; k < KVSIZE; k++) { + ASSERT_OK(Put(1, keys[k], values_pikachu[k])); + } + + // Check key validity, for all keys, + // both in default and pikachu. + for (size_t k = 0; k < KVSIZE; k++) { + ASSERT_EQ(Get(0, keys[k]), values_default[k]); + ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]); + } + } + + // Check that there was at least one mempurge + const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1; + // Check that there was no SST files created during flush. + const uint32_t EXPECTED_SST_COUNT = 0; + + EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); + EXPECT_EQ(sst_count, EXPECTED_SST_COUNT); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + // Check that there was no data corruption anywhere, + // not in 'default' nor in 'Pikachu' CFs. + ASSERT_EQ("v3", Get(1, "foo")); + ASSERT_OK(Put(1, "foo", "v4")); + ASSERT_EQ("v4", Get(1, "foo")); + ASSERT_EQ("v2", Get(1, "bar")); + ASSERT_EQ("v5", Get(1, "baz")); + // Check keys in 'Default' and 'Pikachu'. + // keys[0:KVSIZE/2] were for sure contained + // in the imm() at Reopen/recovery time. + for (size_t k = 0; k < KVSIZE; k++) { + ASSERT_EQ(Get(0, keys[k]), values_default[k]); + ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]); + } + // Insertion of random K-V pairs to trigger + // a flush in the Pikachu CF. + for (size_t j = 0; j < NUM_REPEAT; j++) { + RNDKEY = rnd.RandomString(RAND_KEY_LENGTH); + RNDVALUE = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(1, RNDKEY, RNDVALUE)); + } + // ASsert than there was at least one flush to storage. + EXPECT_GT(sst_count, EXPECTED_SST_COUNT); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ("v4", Get(1, "foo")); + ASSERT_EQ("v2", Get(1, "bar")); + ASSERT_EQ("v5", Get(1, "baz")); + // Since values in default are held in mutable mem() + // and imm(), check if the flush in pikachu didn't + // affect these values. + for (size_t k = 0; k < KVSIZE; k++) { + ASSERT_EQ(Get(0, keys[k]), values_default[k]); + ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]); + } + ASSERT_EQ(Get(1, RNDKEY), RNDVALUE); + } while (ChangeWalOptions()); +} + TEST_P(DBFlushDirectIOTest, DirectIO) { Options options; options.create_if_missing = true; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index ec921227e..2b771e360 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2397,6 +2397,17 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, assert(flush_req.size() == 1); ColumnFamilyData* cfd = flush_req[0].first; assert(cfd); + // Note: SchedulePendingFlush is always preceded + // with an imm()->FlushRequested() call. However, + // we want to make this code snipper more resilient to + // future changes. Therefore, we add the following if + // statement - note that calling it twice (or more) + // doesn't break anything. + if (immutable_db_options_.experimental_allow_mempurge) { + // If imm() contains silent memtables, + // requesting a flush will mark the imm_needed as true. + cfd->imm()->FlushRequested(); + } if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) { cfd->Ref(); cfd->set_queued_for_flush(true); @@ -2538,6 +2549,11 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, for (const auto& iter : flush_req) { ColumnFamilyData* cfd = iter.first; + if (immutable_db_options_.experimental_allow_mempurge) { + // If imm() contains silent memtables, + // requesting a flush will mark the imm_needed as true. + cfd->imm()->FlushRequested(); + } if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) { // can't flush this CF, try next one column_families_not_to_flush.push_back(cfd); diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index b19210724..fcca59ae6 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -630,7 +630,7 @@ Status DBImpl::Recover( // Clear memtables if recovery failed for (auto cfd : *versions_->GetColumnFamilySet()) { cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), - kMaxSequenceNumber); + kMaxSequenceNumber, cfd->GetLogNumber()); } } } @@ -1066,7 +1066,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, flushed = true; cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), - *next_sequence); + *next_sequence, cfd->GetLogNumber()); } } } @@ -1204,7 +1204,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, flushed = true; cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), - versions_->LastSequence()); + versions_->LastSequence(), + cfd->GetLogNumber()); } data_seen = true; } diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index dae004cdd..fcd162494 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -253,8 +253,8 @@ Status DBImplSecondary::RecoverLogFiles( curr_log_num != log_number)) { const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); - MemTable* new_mem = - cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch); + MemTable* new_mem = cfd->ConstructNewMemtable( + mutable_cf_options, seq_of_batch, log_number); cfd->mem()->SetNextLogNumber(log_number); cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free); new_mem->Ref(); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index c2e8c0dc6..9191b6ea6 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1805,7 +1805,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { } if (s.ok()) { SequenceNumber seq = versions_->LastSequence(); - new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); + new_mem = + cfd->ConstructNewMemtable(mutable_cf_options, seq, new_log_number); context->superversion_context.NewSuperVersion(); } ROCKS_LOG_INFO(immutable_db_options_.info_log, diff --git a/db/flush_job.cc b/db/flush_job.cc index b58cca864..9b9385d07 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -272,7 +272,10 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, s = cfd_->imm()->TryInstallMemtableFlushResults( cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_, meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, - log_buffer_, &committed_flush_jobs_info_, &tmp_io_s); + log_buffer_, &committed_flush_jobs_info_, &tmp_io_s, + !(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted), + but 'false' if mempurge successful: no new min log number + or new level 0 file path to write to manifest. */); if (!tmp_io_s.ok()) { io_status_ = tmp_io_s; } @@ -330,6 +333,17 @@ void FlushJob::Cancel() { base_->Unref(); } +uint64_t FlushJob::ExtractEarliestLogFileNumber() { + uint64_t earliest_logno = 0; + for (MemTable* m : mems_) { + uint64_t logno = m->GetEarliestLogFileNumber(); + if (logno > 0 && (earliest_logno == 0 || logno < earliest_logno)) { + earliest_logno = logno; + } + } + return earliest_logno; +} + Status FlushJob::MemPurge() { Status s; db_mutex_->AssertHeld(); @@ -356,12 +370,25 @@ Status FlushJob::MemPurge() { } assert(!memtables.empty()); - SequenceNumber first_seqno = mems_[0]->GetFirstSequenceNumber(); - SequenceNumber earliest_seqno = mems_[0]->GetEarliestSequenceNumber(); + SequenceNumber first_seqno = kMaxSequenceNumber; + SequenceNumber earliest_seqno = kMaxSequenceNumber; + // Pick first and earliest seqno as min of all first_seqno + // and earliest_seqno of the mempurged memtables. + for (const auto& mem : mems_) { + first_seqno = mem->GetFirstSequenceNumber() < first_seqno + ? mem->GetFirstSequenceNumber() + : first_seqno; + earliest_seqno = mem->GetEarliestSequenceNumber() < earliest_seqno + ? mem->GetEarliestSequenceNumber() + : earliest_seqno; + } + ScopedArenaIterator iter( NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(), static_cast(memtables.size()), &arena)); + uint64_t earliest_logno = ExtractEarliestLogFileNumber(); + auto* ioptions = cfd_->ioptions(); // Place iterator at the First (meaning most recent) key node. @@ -400,12 +427,9 @@ Status FlushJob::MemPurge() { } } - // mems are ordered by increasing ID, so mems_[0]->GetID - // returns the smallest memtable ID. - new_mem = - new MemTable((cfd_->internal_comparator()), *(cfd_->ioptions()), - mutable_cf_options_, cfd_->write_buffer_mgr(), - mems_[0]->GetEarliestSequenceNumber(), cfd_->GetID()); + new_mem = new MemTable((cfd_->internal_comparator()), *(cfd_->ioptions()), + mutable_cf_options_, cfd_->write_buffer_mgr(), + earliest_seqno, cfd_->GetID(), earliest_logno); assert(new_mem != nullptr); Env* env = db_options_.env; @@ -530,8 +554,12 @@ Status FlushJob::MemPurge() { // only if it filled at less than 60% capacity (arbitrary heuristic). if (new_mem->ApproximateMemoryUsage() < maxSize) { db_mutex_->Lock(); - cfd_->imm() - ->Add(new_mem, &job_context_->memtables_to_free, false /* trigger_flush. Adding this memtable will not trigger any flush */); + cfd_->imm()->Add(new_mem, + &job_context_->memtables_to_free, + false /* -> trigger_flush=false: + * adding this memtable + * will not trigger a flush. + */); new_mem->Ref(); db_mutex_->Unlock(); } else { diff --git a/db/flush_job.h b/db/flush_job.h index b0a6bf2de..488ef2c7c 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -123,6 +123,7 @@ class FlushJob { // recommend all users not to set this flag as true given that the MemPurge // process has not matured yet. Status MemPurge(); + uint64_t ExtractEarliestLogFileNumber(); #ifndef ROCKSDB_LITE std::unique_ptr GetFlushJobInfo() const; #endif // !ROCKSDB_LITE diff --git a/db/memtable.cc b/db/memtable.cc index 2b2598658..5c872ec5f 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -67,7 +67,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const ImmutableOptions& ioptions, const MutableCFOptions& mutable_cf_options, WriteBufferManager* write_buffer_manager, - SequenceNumber latest_seq, uint32_t column_family_id) + SequenceNumber latest_seq, uint32_t column_family_id, + uint64_t current_logfile_number) : comparator_(cmp), moptions_(ioptions, mutable_cf_options), refs_(0), @@ -98,6 +99,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, earliest_seqno_(latest_seq), creation_seq_(latest_seq), mem_next_logfile_number_(0), + mem_min_logfile_number_(current_logfile_number), min_prep_log_referenced_(0), locks_(moptions_.inplace_update_support ? moptions_.inplace_update_num_locks diff --git a/db/memtable.h b/db/memtable.h index 6f908ae5b..2ee08fc97 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -106,7 +106,8 @@ class MemTable { const ImmutableOptions& ioptions, const MutableCFOptions& mutable_cf_options, WriteBufferManager* write_buffer_manager, - SequenceNumber earliest_seq, uint32_t column_family_id); + SequenceNumber earliest_seq, uint32_t column_family_id, + uint64_t current_logfile_number = 0); // No copying allowed MemTable(const MemTable&) = delete; MemTable& operator=(const MemTable&) = delete; @@ -387,6 +388,16 @@ class MemTable { // operations on the same MemTable. void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; } + // Set the earliest log file number that (possibly) + // contains entries from this memtable. + void SetEarliestLogFileNumber(uint64_t logno) { + mem_min_logfile_number_ = logno; + } + + // Return the earliest log file number that (possibly) + // contains entries from this memtable. + uint64_t GetEarliestLogFileNumber() { return mem_min_logfile_number_; } + // if this memtable contains data from a committed // two phase transaction we must take note of the // log which contains that data so we can know @@ -517,6 +528,10 @@ class MemTable { // The log files earlier than this number can be deleted. uint64_t mem_next_logfile_number_; + // The earliest log containing entries inserted into + // this memtable. + uint64_t mem_min_logfile_number_; + // the earliest log containing a prepared section // which has been inserted into this memtable. std::atomic min_prep_log_referenced_; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 94cb9497d..eb56f205b 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -392,7 +392,7 @@ Status MemTableList::TryInstallMemtableFlushResults( autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer, std::list>* committed_flush_jobs_info, - IOStatus* io_s) { + IOStatus* io_s, bool write_edits) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); mu->AssertHeld(); @@ -476,8 +476,14 @@ Status MemTableList::TryInstallMemtableFlushResults( uint64_t min_wal_number_to_keep = 0; if (vset->db_options()->allow_2pc) { assert(edit_list.size() > 0); + // Note that if mempurge is successful, the edit_list will + // not be applicable (contains info of new min_log number to keep, + // and level 0 file path of SST file created during normal flush, + // so both pieces of information are irrelevant after a successful + // mempurge operation). min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( vset, *cfd, edit_list, memtables_to_flush, prep_tracker); + // We piggyback the information of earliest log file to keep in the // manifest entry for the last file flushed. edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep); @@ -502,13 +508,25 @@ Status MemTableList::TryInstallMemtableFlushResults( RemoveMemTablesOrRestoreFlags(status, cfd, batch_count, log_buffer, to_delete, mu); }; - - // this can release and reacquire the mutex. - s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, - db_directory, /*new_descriptor_log=*/false, - /*column_family_options=*/nullptr, - manifest_write_cb); - *io_s = vset->io_status(); + if (write_edits) { + // this can release and reacquire the mutex. + s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, + db_directory, /*new_descriptor_log=*/false, + /*column_family_options=*/nullptr, + manifest_write_cb); + *io_s = vset->io_status(); + } else { + // If write_edit is false (e.g: successful mempurge), + // then remove old memtables, wake up manifest write queue threads, + // and don't commit anything to the manifest file. + RemoveMemTablesOrRestoreFlags(s, cfd, batch_count, log_buffer, + to_delete, mu); + // Notify new head of manifest write queue. + // wake up all the waiting writers + // TODO(bjlemaire): explain full reason needed or investigate more. + vset->WakeUpWaitingManifestWriters(); + *io_s = IOStatus::OK(); + } } } commit_in_progress_ = false; @@ -676,6 +694,21 @@ void MemTableList::RemoveMemTablesOrRestoreFlags( } } +// Returns the earliest log that possibly contain entries +// from one of the memtables of this memtable_list. +uint64_t MemTableList::EarliestLogContainingData() { + uint64_t min_log = 0; + + for (auto& m : current_->memlist_) { + uint64_t log = m->GetEarliestLogFileNumber(); + if (log > 0 && (min_log == 0 || log < min_log)) { + min_log = log; + } + } + + return min_log; +} + uint64_t MemTableList::PrecomputeMinLogContainingPrepSection( const std::unordered_set* memtables_to_flush) { uint64_t min_log = 0; diff --git a/db/memtable_list.h b/db/memtable_list.h index fde5e8a90..88d9ad017 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -268,7 +268,7 @@ class MemTableList { autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer, std::list>* committed_flush_jobs_info, - IOStatus* io_s); + IOStatus* io_s, bool write_edits = true); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). @@ -312,7 +312,18 @@ class MemTableList { // non-empty (regardless of the min_write_buffer_number_to_merge // parameter). This flush request will persist until the next time // PickMemtablesToFlush() is called. - void FlushRequested() { flush_requested_ = true; } + void FlushRequested() { + flush_requested_ = true; + // If there are some memtables stored in imm() that dont trigger + // flush (eg: mempurge output memtable), then update imm_flush_needed. + // Note: if race condition and imm_flush_needed is set to true + // when there is num_flush_not_started_==0, then there is no + // impact whatsoever. Imm_flush_needed is only used in an assert + // in IsFlushPending(). + if (num_flush_not_started_ > 0) { + imm_flush_needed.store(true, std::memory_order_release); + } + } bool HasFlushRequested() { return flush_requested_; } @@ -336,6 +347,10 @@ class MemTableList { size_t* current_memory_usage() { return ¤t_memory_usage_; } + // Returns the earliest log that possibly contain entries + // from one of the memtables of this memtable_list. + uint64_t EarliestLogContainingData(); + // Returns the min log containing the prep section after memtables listsed in // `memtables_to_flush` are flushed and their status is persisted in manifest. uint64_t PrecomputeMinLogContainingPrepSection( diff --git a/db/repair.cc b/db/repair.cc index 1ebd47402..4a710db96 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -387,7 +387,7 @@ class Repairer { // Initialize per-column family memtables for (auto* cfd : *vset_.GetColumnFamilySet()) { cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), - kMaxSequenceNumber); + kMaxSequenceNumber, cfd->GetLogNumber()); } auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet()); diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 7a2996a59..4206a19e5 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -548,6 +548,11 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, "records NOT monotonically increasing"); } else { cfd->SetLogNumber(edit.log_number_); + if (version_set_->db_options()->experimental_allow_mempurge && + edit.log_number_ > 0 && + (cfd->mem()->GetEarliestLogFileNumber() == 0)) { + cfd->mem()->SetEarliestLogFileNumber(edit.log_number_); + } version_edit_params_.SetLogNumber(edit.log_number_); } } diff --git a/db/version_set.cc b/db/version_set.cc index e80bf67d6..8880c2903 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4414,6 +4414,14 @@ Status VersionSet::ProcessManifestWrites( return s; } +void VersionSet::WakeUpWaitingManifestWriters() { + // wake up all the waiting writers + // Notify new head of manifest write queue. + if (!manifest_writers_.empty()) { + manifest_writers_.front()->cv.Signal(); + } +} + // 'datas' is grammatically incorrect. We still use this notation to indicate // that this variable represents a collection of column_family_data. Status VersionSet::LogAndApply( @@ -5639,7 +5647,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( // GetLatestMutableCFOptions() is safe here without mutex since the // cfd is not available to client new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(), - LastSequence()); + LastSequence(), edit->log_number_); new_cfd->SetLogNumber(edit->log_number_); return new_cfd; } diff --git a/db/version_set.h b/db/version_set.h index 22c0f4a97..d4fc17baa 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1014,6 +1014,7 @@ class VersionSet { FileSystem* fs, std::string* manifest_filename, uint64_t* manifest_file_number); + void WakeUpWaitingManifestWriters(); // Recover the last saved descriptor from persistent storage. // If read_only == true, Recover() will not complain if some column families @@ -1160,6 +1161,15 @@ class VersionSet { if (min_log_num > num && !cfd->IsDropped()) { min_log_num = num; } + // If mempurge is activated, there may be an immutable memtable + // that has data not flushed to any SST file. + if (db_options_->experimental_allow_mempurge && !(cfd->IsEmpty()) && + !(cfd->IsDropped())) { + num = cfd->imm()->EarliestLogContainingData(); + if ((num > 0) && (min_log_num > num)) { + min_log_num = num; + } + } } return min_log_num; } @@ -1177,6 +1187,15 @@ class VersionSet { if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) { min_log_num = cfd->GetLogNumber(); } + // If mempurge is activated, there may be an immutable memtable + // that has data not flushed to any SST file. + if (db_options_->experimental_allow_mempurge && !(cfd->IsEmpty()) && + !(cfd->IsDropped())) { + uint64_t num = cfd->imm()->EarliestLogContainingData(); + if ((num > 0) && (min_log_num > num)) { + min_log_num = num; + } + } } return min_log_num; }