diff --git a/db/column_family.cc b/db/column_family.cc index 45ea22c23..8bc989179 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -298,6 +298,87 @@ Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level, begin, end, compaction_end); } +SuperVersion* ColumnFamilyData::GetReferencedSuperVersion( + port::Mutex* db_mutex) { + SuperVersion* sv = nullptr; + if (LIKELY(column_family_set_->db_options_->allow_thread_local)) { + sv = GetThreadLocalSuperVersion(db_mutex); + sv->Ref(); + if (!ReturnThreadLocalSuperVersion(sv)) { + sv->Unref(); + } + } else { + db_mutex->Lock(); + sv = super_version_->Ref(); + db_mutex->Unlock(); + } + return sv; +} + +SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion( + port::Mutex* db_mutex) { + SuperVersion* sv = nullptr; + // The SuperVersion is cached in thread local storage to avoid acquiring + // mutex when SuperVersion does not change since the last use. When a new + // SuperVersion is installed, the compaction or flush thread cleans up + // cached SuperVersion in all existing thread local storage. To avoid + // acquiring mutex for this operation, we use atomic Swap() on the thread + // local pointer to guarantee exclusive access. If the thread local pointer + // is being used while a new SuperVersion is installed, the cached + // SuperVersion can become stale. In that case, the background thread would + // have swapped in kSVObsolete. We re-check the value at when returning + // SuperVersion back to thread local, with an atomic compare and swap. + // The superversion will need to be released if detected to be stale. + void* ptr = local_sv_->Swap(SuperVersion::kSVInUse); + // Invariant: + // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage + // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage + // should only keep kSVInUse before ReturnThreadLocalSuperVersion call + // (if no Scrape happens). + assert(ptr != SuperVersion::kSVInUse); + sv = static_cast(ptr); + if (sv == SuperVersion::kSVObsolete || + sv->version_number != super_version_number_.load()) { + RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_ACQUIRES); + SuperVersion* sv_to_delete = nullptr; + + if (sv && sv->Unref()) { + RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_CLEANUPS); + db_mutex->Lock(); + // NOTE: underlying resources held by superversion (sst files) might + // not be released until the next background job. + sv->Cleanup(); + sv_to_delete = sv; + } else { + db_mutex->Lock(); + } + sv = super_version_->Ref(); + db_mutex->Unlock(); + + delete sv_to_delete; + } + assert(sv != nullptr); + return sv; +} + +bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) { + assert(sv != nullptr); + // Put the SuperVersion back + void* expected = SuperVersion::kSVInUse; + if (local_sv_->CompareAndSwap(static_cast(sv), expected)) { + // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal + // storage has not been altered and no Scrape has happend. The + // SuperVersion is still current. + return true; + } else { + // ThreadLocal scrape happened in the process of this GetImpl call (after + // thread local Swap() at the beginning and before CompareAndSwap()). + // This means the SuperVersion it holds is obsolete. + assert(expected == SuperVersion::kSVObsolete); + } + return false; +} + SuperVersion* ColumnFamilyData::InstallSuperVersion( SuperVersion* new_superversion, port::Mutex* db_mutex) { new_superversion->db_mutex = db_mutex; @@ -306,6 +387,10 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion( super_version_ = new_superversion; ++super_version_number_; super_version_->version_number = super_version_number_; + // Reset SuperVersions cached in thread local storage + if (column_family_set_->db_options_->allow_thread_local) { + ResetThreadLocalSuperVersions(); + } if (old_superversion != nullptr && old_superversion->Unref()) { old_superversion->Cleanup(); return old_superversion; // will let caller delete outside of mutex diff --git a/db/column_family.h b/db/column_family.h index 42e02f07d..9843e4125 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -199,7 +199,16 @@ class ColumnFamilyData { SuperVersion* GetSuperVersion() { return super_version_; } // thread-safe - ThreadLocalPtr* GetThreadLocalSuperVersion() const { return local_sv_.get(); } + // Return a already referenced SuperVersion to be used safely. + SuperVersion* GetReferencedSuperVersion(port::Mutex* db_mutex); + // thread-safe + // Get SuperVersion stored in thread local storage. If it does not exist, + // get a reference from a current SuperVersion. + SuperVersion* GetThreadLocalSuperVersion(port::Mutex* db_mutex); + // Try to return SuperVersion back to thread local storage. Retrun true on + // success and false on failure. It fails when the thread local storage + // contains anything other than SuperVersion::kSVInUse flag. + bool ReturnThreadLocalSuperVersion(SuperVersion* sv); // thread-safe uint64_t GetSuperVersionNumber() const { return super_version_number_.load(); diff --git a/db/db_impl.cc b/db/db_impl.cc index 3aed6b7b0..440685508 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3179,8 +3179,7 @@ struct IterState { static void CleanupIteratorState(void* arg1, void* arg2) { IterState* state = reinterpret_cast(arg1); - bool need_cleanup = state->super_version->Unref(); - if (need_cleanup) { + if (state->super_version->Unref()) { DBImpl::DeletionState deletion_state; state->mu->Lock(); @@ -3318,10 +3317,6 @@ void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, cfd->InstallSuperVersion(new_superversion, &mutex_); deletion_state.new_superversion = nullptr; deletion_state.superversions_to_free.push_back(old_superversion); - // Reset SuperVersions cached in thread local storage - if (options_.allow_thread_local) { - cfd->ResetThreadLocalSuperVersions(); - } } Status DBImpl::GetImpl(const ReadOptions& options, @@ -3342,47 +3337,9 @@ Status DBImpl::GetImpl(const ReadOptions& options, // Acquire SuperVersion SuperVersion* sv = nullptr; - ThreadLocalPtr* thread_local_sv = nullptr; + // TODO(ljin): consider using GetReferencedSuperVersion() directly if (LIKELY(options_.allow_thread_local)) { - // The SuperVersion is cached in thread local storage to avoid acquiring - // mutex when SuperVersion does not change since the last use. When a new - // SuperVersion is installed, the compaction or flush thread cleans up - // cached SuperVersion in all existing thread local storage. To avoid - // acquiring mutex for this operation, we use atomic Swap() on the thread - // local pointer to guarantee exclusive access. If the thread local pointer - // is being used while a new SuperVersion is installed, the cached - // SuperVersion can become stale. In that case, the background thread would - // have swapped in kSVObsolete. We re-check the value at the end of - // Get, with an atomic compare and swap. The superversion will be released - // if detected to be stale. - thread_local_sv = cfd->GetThreadLocalSuperVersion(); - void* ptr = thread_local_sv->Swap(SuperVersion::kSVInUse); - // Invariant: - // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage - // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage - // should only keep kSVInUse during a GetImpl. - assert(ptr != SuperVersion::kSVInUse); - sv = static_cast(ptr); - if (sv == SuperVersion::kSVObsolete || - sv->version_number != cfd->GetSuperVersionNumber()) { - RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_ACQUIRES); - SuperVersion* sv_to_delete = nullptr; - - if (sv && sv->Unref()) { - RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_CLEANUPS); - mutex_.Lock(); - // TODO underlying resources held by superversion (sst files) might - // not be released until the next background job. - sv->Cleanup(); - sv_to_delete = sv; - } else { - mutex_.Lock(); - } - sv = cfd->GetSuperVersion()->Ref(); - mutex_.Unlock(); - - delete sv_to_delete; - } + sv = cfd->GetThreadLocalSuperVersion(&mutex_); } else { mutex_.Lock(); sv = cfd->GetSuperVersion()->Ref(); @@ -3429,19 +3386,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, bool unref_sv = true; if (LIKELY(options_.allow_thread_local)) { - // Put the SuperVersion back - void* expected = SuperVersion::kSVInUse; - if (thread_local_sv->CompareAndSwap(static_cast(sv), expected)) { - // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal - // storage has not been altered and no Scrape has happend. The - // SuperVersion is still current. - unref_sv = false; - } else { - // ThreadLocal scrape happened in the process of this GetImpl call (after - // thread local Swap() at the beginning and before CompareAndSwap()). - // This means the SuperVersion it holds is obsolete. - assert(expected == SuperVersion::kSVObsolete); - } + unref_sv = !cfd->ReturnThreadLocalSuperVersion(sv); } if (unref_sv) { @@ -3678,22 +3623,18 @@ bool DBImpl::KeyMayExist(const ReadOptions& options, Iterator* DBImpl::NewIterator(const ReadOptions& options, ColumnFamilyHandle* column_family) { - SequenceNumber latest_snapshot = 0; - SuperVersion* super_version = nullptr; auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); - if (!options.tailing) { - mutex_.Lock(); - super_version = cfd->GetSuperVersion()->Ref(); - latest_snapshot = versions_->LastSequence(); - mutex_.Unlock(); - } Iterator* iter; if (options.tailing) { iter = new TailingIterator(this, options, cfd); } else { - iter = NewInternalIterator(options, cfd, super_version); + SequenceNumber latest_snapshot = versions_->LastSequence(); + SuperVersion* sv = nullptr; + sv = cfd->GetReferencedSuperVersion(&mutex_); + + iter = NewInternalIterator(options, cfd, sv); auto snapshot = options.snapshot != nullptr