diff --git a/db/column_family.cc b/db/column_family.cc index de90f6b6f..47389a670 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1059,17 +1059,20 @@ uint64_t ColumnFamilyData::GetLiveSstFilesSize() const { } MemTable* ColumnFamilyData::ConstructNewMemtable( - const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) { + const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq, + uint64_t log_number) { return new MemTable(internal_comparator_, ioptions_, mutable_cf_options, - write_buffer_manager_, earliest_seq, id_); + write_buffer_manager_, earliest_seq, id_, log_number); } void ColumnFamilyData::CreateNewMemtable( - const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) { + const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq, + uint64_t log_number) { if (mem_ != nullptr) { delete mem_->Unref(); } - SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq)); + SetMemtable( + ConstructNewMemtable(mutable_cf_options, earliest_seq, log_number)); mem_->Ref(); } diff --git a/db/column_family.h b/db/column_family.h index 48fb9abc1..d73c3dda9 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -371,9 +371,10 @@ class ColumnFamilyData { // See Memtable constructor for explanation of earliest_seq param. MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options, - SequenceNumber earliest_seq); + SequenceNumber earliest_seq, + uint64_t log_number = 0); void CreateNewMemtable(const MutableCFOptions& mutable_cf_options, - SequenceNumber earliest_seq); + SequenceNumber earliest_seq, uint64_t log_number = 0); TableCache* table_cache() const { return table_cache_.get(); } BlobFileCache* blob_file_cache() const { return blob_file_cache_.get(); } diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 1e8dac7f1..32fa2f17e 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -1106,6 +1106,176 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { ASSERT_EQ(Get(KEY5), p_v5); } +TEST_F(DBFlushTest, MemPurgeWALSupport) { + Options options = CurrentOptions(); + + options.statistics = CreateDBStatistics(); + options.statistics->set_stats_level(StatsLevel::kAll); + options.create_if_missing = true; + options.compression = kNoCompression; + options.inplace_update_support = false; + options.allow_concurrent_memtable_write = true; + + // Enforce size of a single MemTable to 1MB. + options.write_buffer_size = 128 << 10; + // Activate the MemPurge prototype. + options.experimental_allow_mempurge = true; + ASSERT_OK(TryReopen(options)); + + const size_t KVSIZE = 10; + + do { + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_OK(Put(1, "foo", "v1")); + ASSERT_OK(Put(1, "baz", "v5")); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ("v1", Get(1, "foo")); + + ASSERT_EQ("v1", Get(1, "foo")); + ASSERT_EQ("v5", Get(1, "baz")); + ASSERT_OK(Put(0, "bar", "v2")); + ASSERT_OK(Put(1, "bar", "v2")); + ASSERT_OK(Put(1, "foo", "v3")); + uint32_t mempurge_count = 0; + uint32_t sst_count = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:MemPurgeSuccessful", + [&](void* /*arg*/) { mempurge_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector keys; + for (size_t k = 0; k < KVSIZE; k++) { + keys.push_back("IamKey" + std::to_string(k)); + } + + std::string RNDKEY, RNDVALUE; + const std::string NOT_FOUND = "NOT_FOUND"; + + // Heavy overwrite workload, + // more than would fit in maximum allowed memtables. + Random rnd(719); + const size_t NUM_REPEAT = 100; + const size_t RAND_KEY_LENGTH = 8192; + const size_t RAND_VALUES_LENGTH = 1024; + std::vector values_default(KVSIZE), values_pikachu(KVSIZE); + + // Insert a very first set of keys that will be + // mempurged at least once. + for (size_t k = 0; k < KVSIZE / 2; k++) { + values_default[k] = rnd.RandomString(RAND_VALUES_LENGTH); + values_pikachu[k] = rnd.RandomString(RAND_VALUES_LENGTH); + } + + // Insert keys[0:KVSIZE/2] to + // both 'default' and 'pikachu' CFs. + for (size_t k = 0; k < KVSIZE / 2; k++) { + ASSERT_OK(Put(0, keys[k], values_default[k])); + ASSERT_OK(Put(1, keys[k], values_pikachu[k])); + } + + // Check that the insertion was seamless. + for (size_t k = 0; k < KVSIZE / 2; k++) { + ASSERT_EQ(Get(0, keys[k]), values_default[k]); + ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]); + } + + // Insertion of of K-V pairs, multiple times (overwrites) + // into 'default' CF. Will trigger mempurge. + for (size_t j = 0; j < NUM_REPEAT; j++) { + // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. + for (size_t k = KVSIZE / 2; k < KVSIZE; k++) { + values_default[k] = rnd.RandomString(RAND_VALUES_LENGTH); + } + + // Insert K-V into default CF. + for (size_t k = KVSIZE / 2; k < KVSIZE; k++) { + ASSERT_OK(Put(0, keys[k], values_default[k])); + } + + // Check key validity, for all keys, both in + // default and pikachu CFs. + for (size_t k = 0; k < KVSIZE; k++) { + ASSERT_EQ(Get(0, keys[k]), values_default[k]); + } + // Note that at this point, only keys[0:KVSIZE/2] + // have been inserted into Pikachu. + for (size_t k = 0; k < KVSIZE / 2; k++) { + ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]); + } + } + + // Insertion of of K-V pairs, multiple times (overwrites) + // into 'pikachu' CF. Will trigger mempurge. + // Check that we keep the older logs for 'default' imm(). + for (size_t j = 0; j < NUM_REPEAT; j++) { + // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. + for (size_t k = KVSIZE / 2; k < KVSIZE; k++) { + values_pikachu[k] = rnd.RandomString(RAND_VALUES_LENGTH); + } + + // Insert K-V into pikachu CF. + for (size_t k = KVSIZE / 2; k < KVSIZE; k++) { + ASSERT_OK(Put(1, keys[k], values_pikachu[k])); + } + + // Check key validity, for all keys, + // both in default and pikachu. + for (size_t k = 0; k < KVSIZE; k++) { + ASSERT_EQ(Get(0, keys[k]), values_default[k]); + ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]); + } + } + + // Check that there was at least one mempurge + const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1; + // Check that there was no SST files created during flush. + const uint32_t EXPECTED_SST_COUNT = 0; + + EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); + EXPECT_EQ(sst_count, EXPECTED_SST_COUNT); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + // Check that there was no data corruption anywhere, + // not in 'default' nor in 'Pikachu' CFs. + ASSERT_EQ("v3", Get(1, "foo")); + ASSERT_OK(Put(1, "foo", "v4")); + ASSERT_EQ("v4", Get(1, "foo")); + ASSERT_EQ("v2", Get(1, "bar")); + ASSERT_EQ("v5", Get(1, "baz")); + // Check keys in 'Default' and 'Pikachu'. + // keys[0:KVSIZE/2] were for sure contained + // in the imm() at Reopen/recovery time. + for (size_t k = 0; k < KVSIZE; k++) { + ASSERT_EQ(Get(0, keys[k]), values_default[k]); + ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]); + } + // Insertion of random K-V pairs to trigger + // a flush in the Pikachu CF. + for (size_t j = 0; j < NUM_REPEAT; j++) { + RNDKEY = rnd.RandomString(RAND_KEY_LENGTH); + RNDVALUE = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(1, RNDKEY, RNDVALUE)); + } + // ASsert than there was at least one flush to storage. + EXPECT_GT(sst_count, EXPECTED_SST_COUNT); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ("v4", Get(1, "foo")); + ASSERT_EQ("v2", Get(1, "bar")); + ASSERT_EQ("v5", Get(1, "baz")); + // Since values in default are held in mutable mem() + // and imm(), check if the flush in pikachu didn't + // affect these values. + for (size_t k = 0; k < KVSIZE; k++) { + ASSERT_EQ(Get(0, keys[k]), values_default[k]); + ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]); + } + ASSERT_EQ(Get(1, RNDKEY), RNDVALUE); + } while (ChangeWalOptions()); +} + TEST_P(DBFlushDirectIOTest, DirectIO) { Options options; options.create_if_missing = true; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index ec921227e..2b771e360 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2397,6 +2397,17 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, assert(flush_req.size() == 1); ColumnFamilyData* cfd = flush_req[0].first; assert(cfd); + // Note: SchedulePendingFlush is always preceded + // with an imm()->FlushRequested() call. However, + // we want to make this code snipper more resilient to + // future changes. Therefore, we add the following if + // statement - note that calling it twice (or more) + // doesn't break anything. + if (immutable_db_options_.experimental_allow_mempurge) { + // If imm() contains silent memtables, + // requesting a flush will mark the imm_needed as true. + cfd->imm()->FlushRequested(); + } if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) { cfd->Ref(); cfd->set_queued_for_flush(true); @@ -2538,6 +2549,11 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, for (const auto& iter : flush_req) { ColumnFamilyData* cfd = iter.first; + if (immutable_db_options_.experimental_allow_mempurge) { + // If imm() contains silent memtables, + // requesting a flush will mark the imm_needed as true. + cfd->imm()->FlushRequested(); + } if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) { // can't flush this CF, try next one column_families_not_to_flush.push_back(cfd); diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index b19210724..fcca59ae6 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -630,7 +630,7 @@ Status DBImpl::Recover( // Clear memtables if recovery failed for (auto cfd : *versions_->GetColumnFamilySet()) { cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), - kMaxSequenceNumber); + kMaxSequenceNumber, cfd->GetLogNumber()); } } } @@ -1066,7 +1066,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, flushed = true; cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), - *next_sequence); + *next_sequence, cfd->GetLogNumber()); } } } @@ -1204,7 +1204,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, flushed = true; cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), - versions_->LastSequence()); + versions_->LastSequence(), + cfd->GetLogNumber()); } data_seen = true; } diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index dae004cdd..fcd162494 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -253,8 +253,8 @@ Status DBImplSecondary::RecoverLogFiles( curr_log_num != log_number)) { const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); - MemTable* new_mem = - cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch); + MemTable* new_mem = cfd->ConstructNewMemtable( + mutable_cf_options, seq_of_batch, log_number); cfd->mem()->SetNextLogNumber(log_number); cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free); new_mem->Ref(); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index c2e8c0dc6..9191b6ea6 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1805,7 +1805,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { } if (s.ok()) { SequenceNumber seq = versions_->LastSequence(); - new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); + new_mem = + cfd->ConstructNewMemtable(mutable_cf_options, seq, new_log_number); context->superversion_context.NewSuperVersion(); } ROCKS_LOG_INFO(immutable_db_options_.info_log, diff --git a/db/flush_job.cc b/db/flush_job.cc index b58cca864..9b9385d07 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -272,7 +272,10 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, s = cfd_->imm()->TryInstallMemtableFlushResults( cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_, meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, - log_buffer_, &committed_flush_jobs_info_, &tmp_io_s); + log_buffer_, &committed_flush_jobs_info_, &tmp_io_s, + !(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted), + but 'false' if mempurge successful: no new min log number + or new level 0 file path to write to manifest. */); if (!tmp_io_s.ok()) { io_status_ = tmp_io_s; } @@ -330,6 +333,17 @@ void FlushJob::Cancel() { base_->Unref(); } +uint64_t FlushJob::ExtractEarliestLogFileNumber() { + uint64_t earliest_logno = 0; + for (MemTable* m : mems_) { + uint64_t logno = m->GetEarliestLogFileNumber(); + if (logno > 0 && (earliest_logno == 0 || logno < earliest_logno)) { + earliest_logno = logno; + } + } + return earliest_logno; +} + Status FlushJob::MemPurge() { Status s; db_mutex_->AssertHeld(); @@ -356,12 +370,25 @@ Status FlushJob::MemPurge() { } assert(!memtables.empty()); - SequenceNumber first_seqno = mems_[0]->GetFirstSequenceNumber(); - SequenceNumber earliest_seqno = mems_[0]->GetEarliestSequenceNumber(); + SequenceNumber first_seqno = kMaxSequenceNumber; + SequenceNumber earliest_seqno = kMaxSequenceNumber; + // Pick first and earliest seqno as min of all first_seqno + // and earliest_seqno of the mempurged memtables. + for (const auto& mem : mems_) { + first_seqno = mem->GetFirstSequenceNumber() < first_seqno + ? mem->GetFirstSequenceNumber() + : first_seqno; + earliest_seqno = mem->GetEarliestSequenceNumber() < earliest_seqno + ? mem->GetEarliestSequenceNumber() + : earliest_seqno; + } + ScopedArenaIterator iter( NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(), static_cast(memtables.size()), &arena)); + uint64_t earliest_logno = ExtractEarliestLogFileNumber(); + auto* ioptions = cfd_->ioptions(); // Place iterator at the First (meaning most recent) key node. @@ -400,12 +427,9 @@ Status FlushJob::MemPurge() { } } - // mems are ordered by increasing ID, so mems_[0]->GetID - // returns the smallest memtable ID. - new_mem = - new MemTable((cfd_->internal_comparator()), *(cfd_->ioptions()), - mutable_cf_options_, cfd_->write_buffer_mgr(), - mems_[0]->GetEarliestSequenceNumber(), cfd_->GetID()); + new_mem = new MemTable((cfd_->internal_comparator()), *(cfd_->ioptions()), + mutable_cf_options_, cfd_->write_buffer_mgr(), + earliest_seqno, cfd_->GetID(), earliest_logno); assert(new_mem != nullptr); Env* env = db_options_.env; @@ -530,8 +554,12 @@ Status FlushJob::MemPurge() { // only if it filled at less than 60% capacity (arbitrary heuristic). if (new_mem->ApproximateMemoryUsage() < maxSize) { db_mutex_->Lock(); - cfd_->imm() - ->Add(new_mem, &job_context_->memtables_to_free, false /* trigger_flush. Adding this memtable will not trigger any flush */); + cfd_->imm()->Add(new_mem, + &job_context_->memtables_to_free, + false /* -> trigger_flush=false: + * adding this memtable + * will not trigger a flush. + */); new_mem->Ref(); db_mutex_->Unlock(); } else { diff --git a/db/flush_job.h b/db/flush_job.h index b0a6bf2de..488ef2c7c 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -123,6 +123,7 @@ class FlushJob { // recommend all users not to set this flag as true given that the MemPurge // process has not matured yet. Status MemPurge(); + uint64_t ExtractEarliestLogFileNumber(); #ifndef ROCKSDB_LITE std::unique_ptr GetFlushJobInfo() const; #endif // !ROCKSDB_LITE diff --git a/db/memtable.cc b/db/memtable.cc index 2b2598658..5c872ec5f 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -67,7 +67,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const ImmutableOptions& ioptions, const MutableCFOptions& mutable_cf_options, WriteBufferManager* write_buffer_manager, - SequenceNumber latest_seq, uint32_t column_family_id) + SequenceNumber latest_seq, uint32_t column_family_id, + uint64_t current_logfile_number) : comparator_(cmp), moptions_(ioptions, mutable_cf_options), refs_(0), @@ -98,6 +99,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, earliest_seqno_(latest_seq), creation_seq_(latest_seq), mem_next_logfile_number_(0), + mem_min_logfile_number_(current_logfile_number), min_prep_log_referenced_(0), locks_(moptions_.inplace_update_support ? moptions_.inplace_update_num_locks diff --git a/db/memtable.h b/db/memtable.h index 6f908ae5b..2ee08fc97 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -106,7 +106,8 @@ class MemTable { const ImmutableOptions& ioptions, const MutableCFOptions& mutable_cf_options, WriteBufferManager* write_buffer_manager, - SequenceNumber earliest_seq, uint32_t column_family_id); + SequenceNumber earliest_seq, uint32_t column_family_id, + uint64_t current_logfile_number = 0); // No copying allowed MemTable(const MemTable&) = delete; MemTable& operator=(const MemTable&) = delete; @@ -387,6 +388,16 @@ class MemTable { // operations on the same MemTable. void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; } + // Set the earliest log file number that (possibly) + // contains entries from this memtable. + void SetEarliestLogFileNumber(uint64_t logno) { + mem_min_logfile_number_ = logno; + } + + // Return the earliest log file number that (possibly) + // contains entries from this memtable. + uint64_t GetEarliestLogFileNumber() { return mem_min_logfile_number_; } + // if this memtable contains data from a committed // two phase transaction we must take note of the // log which contains that data so we can know @@ -517,6 +528,10 @@ class MemTable { // The log files earlier than this number can be deleted. uint64_t mem_next_logfile_number_; + // The earliest log containing entries inserted into + // this memtable. + uint64_t mem_min_logfile_number_; + // the earliest log containing a prepared section // which has been inserted into this memtable. std::atomic min_prep_log_referenced_; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 94cb9497d..eb56f205b 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -392,7 +392,7 @@ Status MemTableList::TryInstallMemtableFlushResults( autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer, std::list>* committed_flush_jobs_info, - IOStatus* io_s) { + IOStatus* io_s, bool write_edits) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); mu->AssertHeld(); @@ -476,8 +476,14 @@ Status MemTableList::TryInstallMemtableFlushResults( uint64_t min_wal_number_to_keep = 0; if (vset->db_options()->allow_2pc) { assert(edit_list.size() > 0); + // Note that if mempurge is successful, the edit_list will + // not be applicable (contains info of new min_log number to keep, + // and level 0 file path of SST file created during normal flush, + // so both pieces of information are irrelevant after a successful + // mempurge operation). min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( vset, *cfd, edit_list, memtables_to_flush, prep_tracker); + // We piggyback the information of earliest log file to keep in the // manifest entry for the last file flushed. edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep); @@ -502,13 +508,25 @@ Status MemTableList::TryInstallMemtableFlushResults( RemoveMemTablesOrRestoreFlags(status, cfd, batch_count, log_buffer, to_delete, mu); }; - - // this can release and reacquire the mutex. - s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, - db_directory, /*new_descriptor_log=*/false, - /*column_family_options=*/nullptr, - manifest_write_cb); - *io_s = vset->io_status(); + if (write_edits) { + // this can release and reacquire the mutex. + s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, + db_directory, /*new_descriptor_log=*/false, + /*column_family_options=*/nullptr, + manifest_write_cb); + *io_s = vset->io_status(); + } else { + // If write_edit is false (e.g: successful mempurge), + // then remove old memtables, wake up manifest write queue threads, + // and don't commit anything to the manifest file. + RemoveMemTablesOrRestoreFlags(s, cfd, batch_count, log_buffer, + to_delete, mu); + // Notify new head of manifest write queue. + // wake up all the waiting writers + // TODO(bjlemaire): explain full reason needed or investigate more. + vset->WakeUpWaitingManifestWriters(); + *io_s = IOStatus::OK(); + } } } commit_in_progress_ = false; @@ -676,6 +694,21 @@ void MemTableList::RemoveMemTablesOrRestoreFlags( } } +// Returns the earliest log that possibly contain entries +// from one of the memtables of this memtable_list. +uint64_t MemTableList::EarliestLogContainingData() { + uint64_t min_log = 0; + + for (auto& m : current_->memlist_) { + uint64_t log = m->GetEarliestLogFileNumber(); + if (log > 0 && (min_log == 0 || log < min_log)) { + min_log = log; + } + } + + return min_log; +} + uint64_t MemTableList::PrecomputeMinLogContainingPrepSection( const std::unordered_set* memtables_to_flush) { uint64_t min_log = 0; diff --git a/db/memtable_list.h b/db/memtable_list.h index fde5e8a90..88d9ad017 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -268,7 +268,7 @@ class MemTableList { autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer, std::list>* committed_flush_jobs_info, - IOStatus* io_s); + IOStatus* io_s, bool write_edits = true); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). @@ -312,7 +312,18 @@ class MemTableList { // non-empty (regardless of the min_write_buffer_number_to_merge // parameter). This flush request will persist until the next time // PickMemtablesToFlush() is called. - void FlushRequested() { flush_requested_ = true; } + void FlushRequested() { + flush_requested_ = true; + // If there are some memtables stored in imm() that dont trigger + // flush (eg: mempurge output memtable), then update imm_flush_needed. + // Note: if race condition and imm_flush_needed is set to true + // when there is num_flush_not_started_==0, then there is no + // impact whatsoever. Imm_flush_needed is only used in an assert + // in IsFlushPending(). + if (num_flush_not_started_ > 0) { + imm_flush_needed.store(true, std::memory_order_release); + } + } bool HasFlushRequested() { return flush_requested_; } @@ -336,6 +347,10 @@ class MemTableList { size_t* current_memory_usage() { return ¤t_memory_usage_; } + // Returns the earliest log that possibly contain entries + // from one of the memtables of this memtable_list. + uint64_t EarliestLogContainingData(); + // Returns the min log containing the prep section after memtables listsed in // `memtables_to_flush` are flushed and their status is persisted in manifest. uint64_t PrecomputeMinLogContainingPrepSection( diff --git a/db/repair.cc b/db/repair.cc index 1ebd47402..4a710db96 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -387,7 +387,7 @@ class Repairer { // Initialize per-column family memtables for (auto* cfd : *vset_.GetColumnFamilySet()) { cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), - kMaxSequenceNumber); + kMaxSequenceNumber, cfd->GetLogNumber()); } auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet()); diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 7a2996a59..4206a19e5 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -548,6 +548,11 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, "records NOT monotonically increasing"); } else { cfd->SetLogNumber(edit.log_number_); + if (version_set_->db_options()->experimental_allow_mempurge && + edit.log_number_ > 0 && + (cfd->mem()->GetEarliestLogFileNumber() == 0)) { + cfd->mem()->SetEarliestLogFileNumber(edit.log_number_); + } version_edit_params_.SetLogNumber(edit.log_number_); } } diff --git a/db/version_set.cc b/db/version_set.cc index e80bf67d6..8880c2903 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4414,6 +4414,14 @@ Status VersionSet::ProcessManifestWrites( return s; } +void VersionSet::WakeUpWaitingManifestWriters() { + // wake up all the waiting writers + // Notify new head of manifest write queue. + if (!manifest_writers_.empty()) { + manifest_writers_.front()->cv.Signal(); + } +} + // 'datas' is grammatically incorrect. We still use this notation to indicate // that this variable represents a collection of column_family_data. Status VersionSet::LogAndApply( @@ -5639,7 +5647,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( // GetLatestMutableCFOptions() is safe here without mutex since the // cfd is not available to client new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(), - LastSequence()); + LastSequence(), edit->log_number_); new_cfd->SetLogNumber(edit->log_number_); return new_cfd; } diff --git a/db/version_set.h b/db/version_set.h index 22c0f4a97..d4fc17baa 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1014,6 +1014,7 @@ class VersionSet { FileSystem* fs, std::string* manifest_filename, uint64_t* manifest_file_number); + void WakeUpWaitingManifestWriters(); // Recover the last saved descriptor from persistent storage. // If read_only == true, Recover() will not complain if some column families @@ -1160,6 +1161,15 @@ class VersionSet { if (min_log_num > num && !cfd->IsDropped()) { min_log_num = num; } + // If mempurge is activated, there may be an immutable memtable + // that has data not flushed to any SST file. + if (db_options_->experimental_allow_mempurge && !(cfd->IsEmpty()) && + !(cfd->IsDropped())) { + num = cfd->imm()->EarliestLogContainingData(); + if ((num > 0) && (min_log_num > num)) { + min_log_num = num; + } + } } return min_log_num; } @@ -1177,6 +1187,15 @@ class VersionSet { if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) { min_log_num = cfd->GetLogNumber(); } + // If mempurge is activated, there may be an immutable memtable + // that has data not flushed to any SST file. + if (db_options_->experimental_allow_mempurge && !(cfd->IsEmpty()) && + !(cfd->IsDropped())) { + uint64_t num = cfd->imm()->EarliestLogContainingData(); + if ((num > 0) && (min_log_num > num)) { + min_log_num = num; + } + } } return min_log_num; }