diff --git a/db/db_impl.cc b/db/db_impl.cc index 6c57a986d..ece08db8b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -241,6 +241,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) mem_(new MemTable(internal_comparator_, mem_rep_factory_, NumberLevels(), options_)), logfile_number_(0), + super_version_(nullptr), tmp_batch_(), bg_compaction_scheduled_(0), bg_flush_scheduled_(0), @@ -316,6 +317,13 @@ DBImpl::~DBImpl() { bg_logstats_scheduled_) { bg_cv_.Wait(); } + if (super_version_ != nullptr) { + bool is_last_reference __attribute__((unused)); + is_last_reference = super_version_->Unref(); + assert(is_last_reference); + super_version_->Cleanup(); + delete super_version_; + } mutex_.Unlock(); if (db_lock_ != nullptr) { @@ -345,6 +353,13 @@ void DBImpl::TEST_Destroy_DBImpl() { bg_logstats_scheduled_) { bg_cv_.Wait(); } + if (super_version_ != nullptr) { + bool is_last_reference __attribute__((unused)); + is_last_reference = super_version_->Unref(); + assert(is_last_reference); + super_version_->Cleanup(); + delete super_version_; + } // Prevent new compactions from occuring. bg_work_gate_closed_ = true; @@ -443,6 +458,49 @@ void DBImpl::MaybeDumpStats() { } } +// DBImpl::SuperVersion methods +DBImpl::SuperVersion::SuperVersion(const int num_memtables) { + to_delete.resize(num_memtables); +} + +DBImpl::SuperVersion::~SuperVersion() { + for (auto td : to_delete) { + delete td; + } +} + +DBImpl::SuperVersion* DBImpl::SuperVersion::Ref() { + refs.fetch_add(1, std::memory_order_relaxed); + return this; +} + +bool DBImpl::SuperVersion::Unref() { + assert(refs > 0); + // fetch_sub returns the previous value of ref + return refs.fetch_sub(1, std::memory_order_relaxed) == 1; +} + +void DBImpl::SuperVersion::Cleanup() { + assert(refs.load(std::memory_order_relaxed) == 0); + imm.UnrefAll(&to_delete); + MemTable* m = mem->Unref(); + if (m != nullptr) { + to_delete.push_back(m); + } + current->Unref(); +} + +void DBImpl::SuperVersion::Init(MemTable* new_mem, const MemTableList& new_imm, + Version* new_current) { + mem = new_mem; + imm = new_imm; + current = new_current; + mem->Ref(); + imm.RefAll(); + current->Ref(); + refs.store(1, std::memory_order_relaxed); +} + // Returns the list of live files in 'sst_live' and the list // of all files in the filesystem in 'all_files'. // no_full_scan = true -- never do the full scan using GetChildren() @@ -518,11 +576,6 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, // It is not necessary to hold the mutex when invoking this method. void DBImpl::PurgeObsoleteFiles(DeletionState& state) { - // free pending memtables - for (auto m : state.memtables_to_free) { - delete m; - } - // check if there is anything to do if (!state.all_files.size() && !state.sst_delete_files.size() && @@ -1188,6 +1241,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, file_number, pending_outputs_, &deletion_state.memtables_to_free); if (s.ok()) { + InstallSuperVersion(deletion_state); if (madeProgress) { *madeProgress = 1; } @@ -1247,11 +1301,17 @@ int DBImpl::FindMinimumEmptyLevelFitting(int level) { void DBImpl::ReFitLevel(int level, int target_level) { assert(level < NumberLevels()); - MutexLock l(&mutex_); + SuperVersion* superversion_to_free = nullptr; + SuperVersion* new_superversion = + new SuperVersion(options_.max_write_buffer_number); + + mutex_.Lock(); // only allow one thread refitting if (refitting_level_) { + mutex_.Unlock(); Log(options_.info_log, "ReFitLevel: another thread is refitting"); + delete new_superversion; return; } refitting_level_ = true; @@ -1287,6 +1347,8 @@ void DBImpl::ReFitLevel(int level, int target_level) { edit.DebugString().data()); auto status = versions_->LogAndApply(&edit, &mutex_); + superversion_to_free = InstallSuperVersion(new_superversion); + new_superversion = nullptr; Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data()); @@ -1298,6 +1360,10 @@ void DBImpl::ReFitLevel(int level, int target_level) { refitting_level_ = false; bg_work_gate_closed_ = false; + + mutex_.Unlock(); + delete superversion_to_free; + delete new_superversion; } int DBImpl::NumberLevels() { @@ -1671,7 +1737,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, void DBImpl::BackgroundCallFlush() { bool madeProgress = false; - DeletionState deletion_state(options_.max_write_buffer_number); + DeletionState deletion_state(options_.max_write_buffer_number, true); assert(bg_flush_scheduled_); MutexLock l(&mutex_); @@ -1717,7 +1783,7 @@ void DBImpl::TEST_PurgeObsoleteteWAL() { void DBImpl::BackgroundCallCompaction() { bool madeProgress = false; - DeletionState deletion_state(options_.max_write_buffer_number); + DeletionState deletion_state(options_.max_write_buffer_number, true); MaybeDumpStats(); @@ -1770,7 +1836,7 @@ void DBImpl::BackgroundCallCompaction() { } Status DBImpl::BackgroundCompaction(bool* madeProgress, - DeletionState& deletion_state) { + DeletionState& deletion_state) { *madeProgress = false; mutex_.AssertHeld(); @@ -1823,6 +1889,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); status = versions_->LogAndApply(c->edit(), &mutex_); + InstallSuperVersion(deletion_state); VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast(f->number), @@ -2484,6 +2551,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, if (status.ok()) { status = InstallCompactionResults(compact); + InstallSuperVersion(deletion_state); } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, @@ -2588,6 +2656,44 @@ Status DBImpl::Get(const ReadOptions& options, return GetImpl(options, key, value); } +// DeletionState gets created and destructed outside of the lock -- we +// use this convinently to: +// * malloc one SuperVersion() outside of the lock -- new_superversion +// * delete one SuperVersion() outside of the lock -- superversion_to_free +// +// However, if InstallSuperVersion() gets called twice with the same, +// deletion_state, we can't reuse the SuperVersion() that got malloced because +// first call already used it. In that rare case, we take a hit and create a +// new SuperVersion() inside of the mutex. We do similar thing +// for superversion_to_free +void DBImpl::InstallSuperVersion(DeletionState& deletion_state) { + // if new_superversion == nullptr, it means somebody already used it + SuperVersion* new_superversion = + (deletion_state.new_superversion != nullptr) ? + deletion_state.new_superversion : new SuperVersion(); + SuperVersion* old_superversion = InstallSuperVersion(new_superversion); + deletion_state.new_superversion = nullptr; + if (deletion_state.superversion_to_free != nullptr) { + // somebody already put it there + delete old_superversion; + } else { + deletion_state.superversion_to_free = old_superversion; + } +} + +DBImpl::SuperVersion* DBImpl::InstallSuperVersion( + SuperVersion* new_superversion) { + mutex_.AssertHeld(); + new_superversion->Init(mem_, imm_, versions_->current()); + SuperVersion* old_superversion = super_version_; + super_version_ = new_superversion; + if (old_superversion != nullptr && old_superversion->Unref()) { + old_superversion->Cleanup(); + return old_superversion; // will let caller delete outside of mutex + } + return nullptr; +} + Status DBImpl::GetImpl(const ReadOptions& options, const Slice& key, std::string* value, @@ -2596,27 +2702,20 @@ Status DBImpl::GetImpl(const ReadOptions& options, StopWatch sw(env_, options_.statistics.get(), DB_GET); SequenceNumber snapshot; - std::vector to_delete; - mutex_.Lock(); if (options.snapshot != nullptr) { snapshot = reinterpret_cast(options.snapshot)->number_; } else { snapshot = versions_->LastSequence(); } - MemTable* mem = mem_; - MemTableList imm = imm_; - Version* current = versions_->current(); - mem->Ref(); - imm.RefAll(); - current->Ref(); - - // Unlock while reading from files and memtables + // This can be replaced by using atomics and spinlock instead of big mutex + mutex_.Lock(); + SuperVersion* get_version = super_version_->Ref(); mutex_.Unlock(); + bool have_stat_update = false; Version::GetStats stats; - // Prepare to store a list of merge operations if merge occurs. MergeContext merge_context; @@ -2624,32 +2723,41 @@ Status DBImpl::GetImpl(const ReadOptions& options, // s is both in/out. When in, s could either be OK or MergeInProgress. // merge_operands will contain the sequence of merges in the latter case. LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s, merge_context, options_)) { + if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); - } else if (imm.Get(lkey, value, &s, merge_context, options_)) { + } else if (get_version->imm.Get(lkey, value, &s, merge_context, options_)) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); } else { - current->Get(options, lkey, value, &s, &merge_context, &stats, - options_, value_found); + get_version->current->Get(options, lkey, value, &s, &merge_context, &stats, + options_, value_found); have_stat_update = true; RecordTick(options_.statistics.get(), MEMTABLE_MISS); } - mutex_.Lock(); - if (!options_.disable_seek_compaction && - have_stat_update && current->UpdateStats(stats)) { - MaybeScheduleFlushOrCompaction(); + bool delete_get_version = false; + if (!options_.disable_seek_compaction && have_stat_update) { + mutex_.Lock(); + if (get_version->current->UpdateStats(stats)) { + MaybeScheduleFlushOrCompaction(); + } + if (get_version->Unref()) { + get_version->Cleanup(); + delete_get_version = true; + } + mutex_.Unlock(); + } else { + if (get_version->Unref()) { + mutex_.Lock(); + get_version->Cleanup(); + mutex_.Unlock(); + delete_get_version = true; + } + } + if (delete_get_version) { + delete get_version; } - MemTable* m = mem->Unref(); - imm.UnrefAll(&to_delete); - current->Unref(); - mutex_.Unlock(); - - // free up all obsolete memtables outside the mutex - delete m; - for (MemTable* v: to_delete) delete v; // Note, tickers are atomic now - no lock protection needed any more. RecordTick(options_.statistics.get(), NUMBER_KEYS_READ); @@ -2813,7 +2921,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { w.done = false; StopWatch sw(env_, options_.statistics.get(), DB_WRITE); - MutexLock l(&mutex_); + mutex_.Lock(); writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); @@ -2824,6 +2932,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } if (w.done) { + mutex_.Unlock(); RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1); return w.status; } else { @@ -2831,7 +2940,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } // May temporarily unlock and wait. - Status status = MakeRoomForWrite(my_batch == nullptr); + SuperVersion* superversion_to_free = nullptr; + Status status = MakeRoomForWrite(my_batch == nullptr, &superversion_to_free); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions @@ -2919,6 +3029,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { if (!writers_.empty()) { writers_.front()->cv.Signal(); } + mutex_.Unlock(); + delete superversion_to_free; return status; } @@ -3011,7 +3123,8 @@ uint64_t DBImpl::SlowdownAmount(int n, int top, int bottom) { // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::MakeRoomForWrite(bool force) { +Status DBImpl::MakeRoomForWrite(bool force, + SuperVersion** superversion_to_free) { mutex_.AssertHeld(); assert(!writers_.empty()); bool allow_delay = !force; @@ -3020,6 +3133,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { uint64_t rate_limit_delay_millis = 0; Status s; double score; + *superversion_to_free = nullptr; while (true) { if (!bg_error_.ok()) { @@ -3146,6 +3260,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { // Do this without holding the dbmutex lock. assert(versions_->PrevLogNumber() == 0); uint64_t new_log_number = versions_->NewFileNumber(); + SuperVersion* new_superversion = nullptr; mutex_.Unlock(); { EnvOptions soptions(storage_options_); @@ -3162,6 +3277,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size); memtmp = new MemTable( internal_comparator_, mem_rep_factory_, NumberLevels(), options_); + new_superversion = new SuperVersion(options_.max_write_buffer_number); } } mutex_.Lock(); @@ -3186,6 +3302,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { mem_->SetLogNumber(logfile_number_); force = false; // Do not force another compaction if have room MaybeScheduleFlushOrCompaction(); + *superversion_to_free = InstallSuperVersion(new_superversion); } } return s; @@ -3541,7 +3658,7 @@ Status DBImpl::DeleteFile(std::string name) { FileMetaData metadata; int maxlevel = NumberLevels(); VersionEdit edit(maxlevel); - DeletionState deletion_state; + DeletionState deletion_state(0, true); { MutexLock l(&mutex_); status = versions_->GetMetadataForFile(number, &level, &metadata); @@ -3571,14 +3688,14 @@ Status DBImpl::DeleteFile(std::string name) { } edit.DeleteFile(level, number); status = versions_->LogAndApply(&edit, &mutex_); + if (status.ok()) { + InstallSuperVersion(deletion_state); + } FindObsoleteFiles(deletion_state, false); } // lock released here LogFlush(options_.info_log); - - if (status.ok()) { - // remove files outside the db-lock - PurgeObsoleteFiles(deletion_state); - } + // remove files outside the db-lock + PurgeObsoleteFiles(deletion_state); return status; } @@ -3678,6 +3795,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { s = impl->versions_->LogAndApply(&edit, &impl->mutex_); } if (s.ok()) { + delete impl->InstallSuperVersion(new DBImpl::SuperVersion()); impl->mem_->SetLogNumber(impl->logfile_number_); impl->DeleteObsoleteFiles(); impl->MaybeScheduleFlushOrCompaction(); diff --git a/db/db_impl.h b/db/db_impl.h index 39e132979..2447b31fa 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -128,12 +128,38 @@ class DBImpl : public DB { default_interval_to_delete_obsolete_WAL_ = default_interval_to_delete_obsolete_WAL; } - // needed for CleanupIteratorState + // holds references to memtable, all immutable memtables and version + struct SuperVersion { + MemTable* mem; + MemTableList imm; + Version* current; + std::atomic refs; + // We need to_delete because during Cleanup(), imm.UnrefAll() returns + // all memtables that we need to free through this vector. We then + // delete all those memtables outside of mutex, during destruction + std::vector to_delete; + + // should be called outside the mutex + explicit SuperVersion(const int num_memtables = 0); + ~SuperVersion(); + SuperVersion* Ref(); + // Returns true if this was the last reference and caller should + // call Clenaup() and delete the object + bool Unref(); + + // call these two methods with db mutex held + // Cleanup unrefs mem, imm and current. Also, it stores all memtables + // that needs to be deleted in to_delete vector. Unrefing those + // objects needs to be done in the mutex + void Cleanup(); + void Init(MemTable* new_mem, const MemTableList& new_imm, + Version* new_current); + }; + // needed for CleanupIteratorState struct DeletionState { inline bool HaveSomethingToDelete() const { - return memtables_to_free.size() || - all_files.size() || + return all_files.size() || sst_delete_files.size() || log_delete_files.size(); } @@ -155,15 +181,35 @@ class DBImpl : public DB { // a list of memtables to be free std::vector memtables_to_free; + SuperVersion* superversion_to_free; // if nullptr nothing to free + + SuperVersion* new_superversion; // if nullptr no new superversion + // the current manifest_file_number, log_number and prev_log_number // that corresponds to the set of files in 'live'. uint64_t manifest_file_number, log_number, prev_log_number; - explicit DeletionState(const int num_memtables = 0) { + explicit DeletionState(const int num_memtables = 0, + bool create_superversion = false) { manifest_file_number = 0; log_number = 0; prev_log_number = 0; memtables_to_free.reserve(num_memtables); + superversion_to_free = nullptr; + new_superversion = + create_superversion ? new SuperVersion(num_memtables) : nullptr; + } + + ~DeletionState() { + // free pending memtables + for (auto m : memtables_to_free) { + delete m; + } + // free superversion. if nullptr, this will be noop + delete superversion_to_free; + // if new_superversion was not used, it will be non-nullptr and needs + // to be freed here + delete new_superversion; } }; @@ -240,7 +286,11 @@ class DBImpl : public DB { uint64_t* filenumber); uint64_t SlowdownAmount(int n, int top, int bottom); - Status MakeRoomForWrite(bool force /* compact even if there is room? */); + // MakeRoomForWrite will return superversion_to_free through an arugment, + // which the caller needs to delete. We do it because caller can delete + // the superversion outside of mutex + Status MakeRoomForWrite(bool force /* compact even if there is room? */, + SuperVersion** superversion_to_free); WriteBatch* BuildBatchGroup(Writer** last_writer); // Force current memtable contents to be flushed. @@ -324,6 +374,8 @@ class DBImpl : public DB { uint64_t logfile_number_; unique_ptr log_; + SuperVersion* super_version_; + std::string host_name_; // Queue of writers. @@ -491,6 +543,18 @@ class DBImpl : public DB { std::vector& snapshots, SequenceNumber* prev_snapshot); + // will return a pointer to SuperVersion* if previous SuperVersion + // if its reference count is zero and needs deletion or nullptr if not + // As argument takes a pointer to allocated SuperVersion + // Foreground threads call this function directly (they don't carry + // deletion state and have to handle their own creation and deletion + // of SuperVersion) + SuperVersion* InstallSuperVersion(SuperVersion* new_superversion); + // Background threads call this function, which is just a wrapper around + // the InstallSuperVersion() function above. Background threads carry + // deletion_state which can have new_superversion already allocated. + void InstallSuperVersion(DeletionState& deletion_state); + // Function that Get and KeyMayExist call with no_io true or false // Note: 'value_found' from KeyMayExist propagates here Status GetImpl(const ReadOptions& options, diff --git a/db/version_set.h b/db/version_set.h index bf466a932..75b529942 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -272,12 +272,14 @@ class VersionSet { int64_t NumLevelBytes(int level) const; // Return the last sequence number. - uint64_t LastSequence() const { return last_sequence_; } + uint64_t LastSequence() const { + return last_sequence_.load(std::memory_order_acquire); + } // Set the last sequence number to s. void SetLastSequence(uint64_t s) { assert(s >= last_sequence_); - last_sequence_ = s; + last_sequence_.store(s, std::memory_order_release); } // Mark the specified file number as used. @@ -476,7 +478,7 @@ class VersionSet { const InternalKeyComparator icmp_; uint64_t next_file_number_; uint64_t manifest_file_number_; - uint64_t last_sequence_; + std::atomic last_sequence_; uint64_t log_number_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted