From 1fdb3f7dc60e96394e3e5b69a46ede5d67fb976c Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 20 Dec 2013 09:57:58 -0800 Subject: [PATCH] [RocksDB] Optimize locking for Get Summary: Instead of locking and saving a DB state, we can cache a DB state and update it only when it changes. This change reduces lock contention and speeds up read operations on the DB. Performance improvements are substantial, although there is some cost in no-read workloads. I ran the regression tests on my devserver and here are the numbers: overwrite 56345 -> 63001 fillseq 193730 -> 185296 readrandom 771301 -> 1219803 (58% improvement!) readrandom_smallblockcache 677609 -> 862850 readrandom_memtable_sst 710440 -> 1109223 readrandom_fillunique_random 221589 -> 247869 memtablefillrandom 105286 -> 92643 memtablereadrandom 763033 -> 1288862 Test Plan: make asan_check I am also running db_stress Reviewers: dhruba, haobo, sdong, kailiu Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D14679 --- db/db_impl.cc | 208 +++++++++++++++++++++++++++++++++++++---------- db/db_impl.h | 74 +++++++++++++++-- db/version_set.h | 8 +- 3 files changed, 237 insertions(+), 53 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 6c57a986d..ece08db8b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -241,6 +241,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) mem_(new MemTable(internal_comparator_, mem_rep_factory_, NumberLevels(), options_)), logfile_number_(0), + super_version_(nullptr), tmp_batch_(), bg_compaction_scheduled_(0), bg_flush_scheduled_(0), @@ -316,6 +317,13 @@ DBImpl::~DBImpl() { bg_logstats_scheduled_) { bg_cv_.Wait(); } + if (super_version_ != nullptr) { + bool is_last_reference __attribute__((unused)); + is_last_reference = super_version_->Unref(); + assert(is_last_reference); + super_version_->Cleanup(); + delete super_version_; + } mutex_.Unlock(); if (db_lock_ != nullptr) { @@ -345,6 +353,13 @@ void DBImpl::TEST_Destroy_DBImpl() { bg_logstats_scheduled_) { bg_cv_.Wait(); } + if (super_version_ != nullptr) { + bool is_last_reference __attribute__((unused)); + is_last_reference = super_version_->Unref(); + assert(is_last_reference); + super_version_->Cleanup(); + delete super_version_; + } // Prevent new compactions from occuring. bg_work_gate_closed_ = true; @@ -443,6 +458,49 @@ void DBImpl::MaybeDumpStats() { } } +// DBImpl::SuperVersion methods +DBImpl::SuperVersion::SuperVersion(const int num_memtables) { + to_delete.resize(num_memtables); +} + +DBImpl::SuperVersion::~SuperVersion() { + for (auto td : to_delete) { + delete td; + } +} + +DBImpl::SuperVersion* DBImpl::SuperVersion::Ref() { + refs.fetch_add(1, std::memory_order_relaxed); + return this; +} + +bool DBImpl::SuperVersion::Unref() { + assert(refs > 0); + // fetch_sub returns the previous value of ref + return refs.fetch_sub(1, std::memory_order_relaxed) == 1; +} + +void DBImpl::SuperVersion::Cleanup() { + assert(refs.load(std::memory_order_relaxed) == 0); + imm.UnrefAll(&to_delete); + MemTable* m = mem->Unref(); + if (m != nullptr) { + to_delete.push_back(m); + } + current->Unref(); +} + +void DBImpl::SuperVersion::Init(MemTable* new_mem, const MemTableList& new_imm, + Version* new_current) { + mem = new_mem; + imm = new_imm; + current = new_current; + mem->Ref(); + imm.RefAll(); + current->Ref(); + refs.store(1, std::memory_order_relaxed); +} + // Returns the list of live files in 'sst_live' and the list // of all files in the filesystem in 'all_files'. // no_full_scan = true -- never do the full scan using GetChildren() @@ -518,11 +576,6 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, // It is not necessary to hold the mutex when invoking this method. void DBImpl::PurgeObsoleteFiles(DeletionState& state) { - // free pending memtables - for (auto m : state.memtables_to_free) { - delete m; - } - // check if there is anything to do if (!state.all_files.size() && !state.sst_delete_files.size() && @@ -1188,6 +1241,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, file_number, pending_outputs_, &deletion_state.memtables_to_free); if (s.ok()) { + InstallSuperVersion(deletion_state); if (madeProgress) { *madeProgress = 1; } @@ -1247,11 +1301,17 @@ int DBImpl::FindMinimumEmptyLevelFitting(int level) { void DBImpl::ReFitLevel(int level, int target_level) { assert(level < NumberLevels()); - MutexLock l(&mutex_); + SuperVersion* superversion_to_free = nullptr; + SuperVersion* new_superversion = + new SuperVersion(options_.max_write_buffer_number); + + mutex_.Lock(); // only allow one thread refitting if (refitting_level_) { + mutex_.Unlock(); Log(options_.info_log, "ReFitLevel: another thread is refitting"); + delete new_superversion; return; } refitting_level_ = true; @@ -1287,6 +1347,8 @@ void DBImpl::ReFitLevel(int level, int target_level) { edit.DebugString().data()); auto status = versions_->LogAndApply(&edit, &mutex_); + superversion_to_free = InstallSuperVersion(new_superversion); + new_superversion = nullptr; Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data()); @@ -1298,6 +1360,10 @@ void DBImpl::ReFitLevel(int level, int target_level) { refitting_level_ = false; bg_work_gate_closed_ = false; + + mutex_.Unlock(); + delete superversion_to_free; + delete new_superversion; } int DBImpl::NumberLevels() { @@ -1671,7 +1737,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, void DBImpl::BackgroundCallFlush() { bool madeProgress = false; - DeletionState deletion_state(options_.max_write_buffer_number); + DeletionState deletion_state(options_.max_write_buffer_number, true); assert(bg_flush_scheduled_); MutexLock l(&mutex_); @@ -1717,7 +1783,7 @@ void DBImpl::TEST_PurgeObsoleteteWAL() { void DBImpl::BackgroundCallCompaction() { bool madeProgress = false; - DeletionState deletion_state(options_.max_write_buffer_number); + DeletionState deletion_state(options_.max_write_buffer_number, true); MaybeDumpStats(); @@ -1770,7 +1836,7 @@ void DBImpl::BackgroundCallCompaction() { } Status DBImpl::BackgroundCompaction(bool* madeProgress, - DeletionState& deletion_state) { + DeletionState& deletion_state) { *madeProgress = false; mutex_.AssertHeld(); @@ -1823,6 +1889,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); status = versions_->LogAndApply(c->edit(), &mutex_); + InstallSuperVersion(deletion_state); VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast(f->number), @@ -2484,6 +2551,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, if (status.ok()) { status = InstallCompactionResults(compact); + InstallSuperVersion(deletion_state); } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, @@ -2588,6 +2656,44 @@ Status DBImpl::Get(const ReadOptions& options, return GetImpl(options, key, value); } +// DeletionState gets created and destructed outside of the lock -- we +// use this convinently to: +// * malloc one SuperVersion() outside of the lock -- new_superversion +// * delete one SuperVersion() outside of the lock -- superversion_to_free +// +// However, if InstallSuperVersion() gets called twice with the same, +// deletion_state, we can't reuse the SuperVersion() that got malloced because +// first call already used it. In that rare case, we take a hit and create a +// new SuperVersion() inside of the mutex. We do similar thing +// for superversion_to_free +void DBImpl::InstallSuperVersion(DeletionState& deletion_state) { + // if new_superversion == nullptr, it means somebody already used it + SuperVersion* new_superversion = + (deletion_state.new_superversion != nullptr) ? + deletion_state.new_superversion : new SuperVersion(); + SuperVersion* old_superversion = InstallSuperVersion(new_superversion); + deletion_state.new_superversion = nullptr; + if (deletion_state.superversion_to_free != nullptr) { + // somebody already put it there + delete old_superversion; + } else { + deletion_state.superversion_to_free = old_superversion; + } +} + +DBImpl::SuperVersion* DBImpl::InstallSuperVersion( + SuperVersion* new_superversion) { + mutex_.AssertHeld(); + new_superversion->Init(mem_, imm_, versions_->current()); + SuperVersion* old_superversion = super_version_; + super_version_ = new_superversion; + if (old_superversion != nullptr && old_superversion->Unref()) { + old_superversion->Cleanup(); + return old_superversion; // will let caller delete outside of mutex + } + return nullptr; +} + Status DBImpl::GetImpl(const ReadOptions& options, const Slice& key, std::string* value, @@ -2596,27 +2702,20 @@ Status DBImpl::GetImpl(const ReadOptions& options, StopWatch sw(env_, options_.statistics.get(), DB_GET); SequenceNumber snapshot; - std::vector to_delete; - mutex_.Lock(); if (options.snapshot != nullptr) { snapshot = reinterpret_cast(options.snapshot)->number_; } else { snapshot = versions_->LastSequence(); } - MemTable* mem = mem_; - MemTableList imm = imm_; - Version* current = versions_->current(); - mem->Ref(); - imm.RefAll(); - current->Ref(); - - // Unlock while reading from files and memtables + // This can be replaced by using atomics and spinlock instead of big mutex + mutex_.Lock(); + SuperVersion* get_version = super_version_->Ref(); mutex_.Unlock(); + bool have_stat_update = false; Version::GetStats stats; - // Prepare to store a list of merge operations if merge occurs. MergeContext merge_context; @@ -2624,32 +2723,41 @@ Status DBImpl::GetImpl(const ReadOptions& options, // s is both in/out. When in, s could either be OK or MergeInProgress. // merge_operands will contain the sequence of merges in the latter case. LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s, merge_context, options_)) { + if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); - } else if (imm.Get(lkey, value, &s, merge_context, options_)) { + } else if (get_version->imm.Get(lkey, value, &s, merge_context, options_)) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); } else { - current->Get(options, lkey, value, &s, &merge_context, &stats, - options_, value_found); + get_version->current->Get(options, lkey, value, &s, &merge_context, &stats, + options_, value_found); have_stat_update = true; RecordTick(options_.statistics.get(), MEMTABLE_MISS); } - mutex_.Lock(); - if (!options_.disable_seek_compaction && - have_stat_update && current->UpdateStats(stats)) { - MaybeScheduleFlushOrCompaction(); + bool delete_get_version = false; + if (!options_.disable_seek_compaction && have_stat_update) { + mutex_.Lock(); + if (get_version->current->UpdateStats(stats)) { + MaybeScheduleFlushOrCompaction(); + } + if (get_version->Unref()) { + get_version->Cleanup(); + delete_get_version = true; + } + mutex_.Unlock(); + } else { + if (get_version->Unref()) { + mutex_.Lock(); + get_version->Cleanup(); + mutex_.Unlock(); + delete_get_version = true; + } + } + if (delete_get_version) { + delete get_version; } - MemTable* m = mem->Unref(); - imm.UnrefAll(&to_delete); - current->Unref(); - mutex_.Unlock(); - - // free up all obsolete memtables outside the mutex - delete m; - for (MemTable* v: to_delete) delete v; // Note, tickers are atomic now - no lock protection needed any more. RecordTick(options_.statistics.get(), NUMBER_KEYS_READ); @@ -2813,7 +2921,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { w.done = false; StopWatch sw(env_, options_.statistics.get(), DB_WRITE); - MutexLock l(&mutex_); + mutex_.Lock(); writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); @@ -2824,6 +2932,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } if (w.done) { + mutex_.Unlock(); RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1); return w.status; } else { @@ -2831,7 +2940,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } // May temporarily unlock and wait. - Status status = MakeRoomForWrite(my_batch == nullptr); + SuperVersion* superversion_to_free = nullptr; + Status status = MakeRoomForWrite(my_batch == nullptr, &superversion_to_free); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions @@ -2919,6 +3029,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { if (!writers_.empty()) { writers_.front()->cv.Signal(); } + mutex_.Unlock(); + delete superversion_to_free; return status; } @@ -3011,7 +3123,8 @@ uint64_t DBImpl::SlowdownAmount(int n, int top, int bottom) { // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::MakeRoomForWrite(bool force) { +Status DBImpl::MakeRoomForWrite(bool force, + SuperVersion** superversion_to_free) { mutex_.AssertHeld(); assert(!writers_.empty()); bool allow_delay = !force; @@ -3020,6 +3133,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { uint64_t rate_limit_delay_millis = 0; Status s; double score; + *superversion_to_free = nullptr; while (true) { if (!bg_error_.ok()) { @@ -3146,6 +3260,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { // Do this without holding the dbmutex lock. assert(versions_->PrevLogNumber() == 0); uint64_t new_log_number = versions_->NewFileNumber(); + SuperVersion* new_superversion = nullptr; mutex_.Unlock(); { EnvOptions soptions(storage_options_); @@ -3162,6 +3277,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size); memtmp = new MemTable( internal_comparator_, mem_rep_factory_, NumberLevels(), options_); + new_superversion = new SuperVersion(options_.max_write_buffer_number); } } mutex_.Lock(); @@ -3186,6 +3302,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { mem_->SetLogNumber(logfile_number_); force = false; // Do not force another compaction if have room MaybeScheduleFlushOrCompaction(); + *superversion_to_free = InstallSuperVersion(new_superversion); } } return s; @@ -3541,7 +3658,7 @@ Status DBImpl::DeleteFile(std::string name) { FileMetaData metadata; int maxlevel = NumberLevels(); VersionEdit edit(maxlevel); - DeletionState deletion_state; + DeletionState deletion_state(0, true); { MutexLock l(&mutex_); status = versions_->GetMetadataForFile(number, &level, &metadata); @@ -3571,14 +3688,14 @@ Status DBImpl::DeleteFile(std::string name) { } edit.DeleteFile(level, number); status = versions_->LogAndApply(&edit, &mutex_); + if (status.ok()) { + InstallSuperVersion(deletion_state); + } FindObsoleteFiles(deletion_state, false); } // lock released here LogFlush(options_.info_log); - - if (status.ok()) { - // remove files outside the db-lock - PurgeObsoleteFiles(deletion_state); - } + // remove files outside the db-lock + PurgeObsoleteFiles(deletion_state); return status; } @@ -3678,6 +3795,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { s = impl->versions_->LogAndApply(&edit, &impl->mutex_); } if (s.ok()) { + delete impl->InstallSuperVersion(new DBImpl::SuperVersion()); impl->mem_->SetLogNumber(impl->logfile_number_); impl->DeleteObsoleteFiles(); impl->MaybeScheduleFlushOrCompaction(); diff --git a/db/db_impl.h b/db/db_impl.h index 39e132979..2447b31fa 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -128,12 +128,38 @@ class DBImpl : public DB { default_interval_to_delete_obsolete_WAL_ = default_interval_to_delete_obsolete_WAL; } - // needed for CleanupIteratorState + // holds references to memtable, all immutable memtables and version + struct SuperVersion { + MemTable* mem; + MemTableList imm; + Version* current; + std::atomic refs; + // We need to_delete because during Cleanup(), imm.UnrefAll() returns + // all memtables that we need to free through this vector. We then + // delete all those memtables outside of mutex, during destruction + std::vector to_delete; + + // should be called outside the mutex + explicit SuperVersion(const int num_memtables = 0); + ~SuperVersion(); + SuperVersion* Ref(); + // Returns true if this was the last reference and caller should + // call Clenaup() and delete the object + bool Unref(); + + // call these two methods with db mutex held + // Cleanup unrefs mem, imm and current. Also, it stores all memtables + // that needs to be deleted in to_delete vector. Unrefing those + // objects needs to be done in the mutex + void Cleanup(); + void Init(MemTable* new_mem, const MemTableList& new_imm, + Version* new_current); + }; + // needed for CleanupIteratorState struct DeletionState { inline bool HaveSomethingToDelete() const { - return memtables_to_free.size() || - all_files.size() || + return all_files.size() || sst_delete_files.size() || log_delete_files.size(); } @@ -155,15 +181,35 @@ class DBImpl : public DB { // a list of memtables to be free std::vector memtables_to_free; + SuperVersion* superversion_to_free; // if nullptr nothing to free + + SuperVersion* new_superversion; // if nullptr no new superversion + // the current manifest_file_number, log_number and prev_log_number // that corresponds to the set of files in 'live'. uint64_t manifest_file_number, log_number, prev_log_number; - explicit DeletionState(const int num_memtables = 0) { + explicit DeletionState(const int num_memtables = 0, + bool create_superversion = false) { manifest_file_number = 0; log_number = 0; prev_log_number = 0; memtables_to_free.reserve(num_memtables); + superversion_to_free = nullptr; + new_superversion = + create_superversion ? new SuperVersion(num_memtables) : nullptr; + } + + ~DeletionState() { + // free pending memtables + for (auto m : memtables_to_free) { + delete m; + } + // free superversion. if nullptr, this will be noop + delete superversion_to_free; + // if new_superversion was not used, it will be non-nullptr and needs + // to be freed here + delete new_superversion; } }; @@ -240,7 +286,11 @@ class DBImpl : public DB { uint64_t* filenumber); uint64_t SlowdownAmount(int n, int top, int bottom); - Status MakeRoomForWrite(bool force /* compact even if there is room? */); + // MakeRoomForWrite will return superversion_to_free through an arugment, + // which the caller needs to delete. We do it because caller can delete + // the superversion outside of mutex + Status MakeRoomForWrite(bool force /* compact even if there is room? */, + SuperVersion** superversion_to_free); WriteBatch* BuildBatchGroup(Writer** last_writer); // Force current memtable contents to be flushed. @@ -324,6 +374,8 @@ class DBImpl : public DB { uint64_t logfile_number_; unique_ptr log_; + SuperVersion* super_version_; + std::string host_name_; // Queue of writers. @@ -491,6 +543,18 @@ class DBImpl : public DB { std::vector& snapshots, SequenceNumber* prev_snapshot); + // will return a pointer to SuperVersion* if previous SuperVersion + // if its reference count is zero and needs deletion or nullptr if not + // As argument takes a pointer to allocated SuperVersion + // Foreground threads call this function directly (they don't carry + // deletion state and have to handle their own creation and deletion + // of SuperVersion) + SuperVersion* InstallSuperVersion(SuperVersion* new_superversion); + // Background threads call this function, which is just a wrapper around + // the InstallSuperVersion() function above. Background threads carry + // deletion_state which can have new_superversion already allocated. + void InstallSuperVersion(DeletionState& deletion_state); + // Function that Get and KeyMayExist call with no_io true or false // Note: 'value_found' from KeyMayExist propagates here Status GetImpl(const ReadOptions& options, diff --git a/db/version_set.h b/db/version_set.h index bf466a932..75b529942 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -272,12 +272,14 @@ class VersionSet { int64_t NumLevelBytes(int level) const; // Return the last sequence number. - uint64_t LastSequence() const { return last_sequence_; } + uint64_t LastSequence() const { + return last_sequence_.load(std::memory_order_acquire); + } // Set the last sequence number to s. void SetLastSequence(uint64_t s) { assert(s >= last_sequence_); - last_sequence_ = s; + last_sequence_.store(s, std::memory_order_release); } // Mark the specified file number as used. @@ -476,7 +478,7 @@ class VersionSet { const InternalKeyComparator icmp_; uint64_t next_file_number_; uint64_t manifest_file_number_; - uint64_t last_sequence_; + std::atomic last_sequence_; uint64_t log_number_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted