using thread local SuperVersion for NewIterator

Summary:
Similar to GetImp(), use SuperVersion from thread local instead of acquriing mutex.
I don't expect this change will make a dent on NewIterator() performance
because the bottleneck seems to be on the rest part of the API

Test Plan:
make asan_check
will post perf numbers

Reviewers: haobo, igor, sdong, dhruba, yhchiang

Reviewed By: sdong

CC: leveldb

Differential Revision: https://reviews.facebook.net/D17643
main
Lei Jin 11 years ago
parent d5e087b6df
commit 539dd207df
  1. 85
      db/column_family.cc
  2. 11
      db/column_family.h
  3. 77
      db/db_impl.cc

@ -298,6 +298,87 @@ Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level,
begin, end, compaction_end); begin, end, compaction_end);
} }
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
port::Mutex* db_mutex) {
SuperVersion* sv = nullptr;
if (LIKELY(column_family_set_->db_options_->allow_thread_local)) {
sv = GetThreadLocalSuperVersion(db_mutex);
sv->Ref();
if (!ReturnThreadLocalSuperVersion(sv)) {
sv->Unref();
}
} else {
db_mutex->Lock();
sv = super_version_->Ref();
db_mutex->Unlock();
}
return sv;
}
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
port::Mutex* db_mutex) {
SuperVersion* sv = nullptr;
// The SuperVersion is cached in thread local storage to avoid acquiring
// mutex when SuperVersion does not change since the last use. When a new
// SuperVersion is installed, the compaction or flush thread cleans up
// cached SuperVersion in all existing thread local storage. To avoid
// acquiring mutex for this operation, we use atomic Swap() on the thread
// local pointer to guarantee exclusive access. If the thread local pointer
// is being used while a new SuperVersion is installed, the cached
// SuperVersion can become stale. In that case, the background thread would
// have swapped in kSVObsolete. We re-check the value at when returning
// SuperVersion back to thread local, with an atomic compare and swap.
// The superversion will need to be released if detected to be stale.
void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
// Invariant:
// (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
// (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
// should only keep kSVInUse before ReturnThreadLocalSuperVersion call
// (if no Scrape happens).
assert(ptr != SuperVersion::kSVInUse);
sv = static_cast<SuperVersion*>(ptr);
if (sv == SuperVersion::kSVObsolete ||
sv->version_number != super_version_number_.load()) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_ACQUIRES);
SuperVersion* sv_to_delete = nullptr;
if (sv && sv->Unref()) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_CLEANUPS);
db_mutex->Lock();
// NOTE: underlying resources held by superversion (sst files) might
// not be released until the next background job.
sv->Cleanup();
sv_to_delete = sv;
} else {
db_mutex->Lock();
}
sv = super_version_->Ref();
db_mutex->Unlock();
delete sv_to_delete;
}
assert(sv != nullptr);
return sv;
}
bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
assert(sv != nullptr);
// Put the SuperVersion back
void* expected = SuperVersion::kSVInUse;
if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
// When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
// storage has not been altered and no Scrape has happend. The
// SuperVersion is still current.
return true;
} else {
// ThreadLocal scrape happened in the process of this GetImpl call (after
// thread local Swap() at the beginning and before CompareAndSwap()).
// This means the SuperVersion it holds is obsolete.
assert(expected == SuperVersion::kSVObsolete);
}
return false;
}
SuperVersion* ColumnFamilyData::InstallSuperVersion( SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion, port::Mutex* db_mutex) { SuperVersion* new_superversion, port::Mutex* db_mutex) {
new_superversion->db_mutex = db_mutex; new_superversion->db_mutex = db_mutex;
@ -306,6 +387,10 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion(
super_version_ = new_superversion; super_version_ = new_superversion;
++super_version_number_; ++super_version_number_;
super_version_->version_number = super_version_number_; super_version_->version_number = super_version_number_;
// Reset SuperVersions cached in thread local storage
if (column_family_set_->db_options_->allow_thread_local) {
ResetThreadLocalSuperVersions();
}
if (old_superversion != nullptr && old_superversion->Unref()) { if (old_superversion != nullptr && old_superversion->Unref()) {
old_superversion->Cleanup(); old_superversion->Cleanup();
return old_superversion; // will let caller delete outside of mutex return old_superversion; // will let caller delete outside of mutex

@ -199,7 +199,16 @@ class ColumnFamilyData {
SuperVersion* GetSuperVersion() { return super_version_; } SuperVersion* GetSuperVersion() { return super_version_; }
// thread-safe // thread-safe
ThreadLocalPtr* GetThreadLocalSuperVersion() const { return local_sv_.get(); } // Return a already referenced SuperVersion to be used safely.
SuperVersion* GetReferencedSuperVersion(port::Mutex* db_mutex);
// thread-safe
// Get SuperVersion stored in thread local storage. If it does not exist,
// get a reference from a current SuperVersion.
SuperVersion* GetThreadLocalSuperVersion(port::Mutex* db_mutex);
// Try to return SuperVersion back to thread local storage. Retrun true on
// success and false on failure. It fails when the thread local storage
// contains anything other than SuperVersion::kSVInUse flag.
bool ReturnThreadLocalSuperVersion(SuperVersion* sv);
// thread-safe // thread-safe
uint64_t GetSuperVersionNumber() const { uint64_t GetSuperVersionNumber() const {
return super_version_number_.load(); return super_version_number_.load();

@ -3179,8 +3179,7 @@ struct IterState {
static void CleanupIteratorState(void* arg1, void* arg2) { static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1); IterState* state = reinterpret_cast<IterState*>(arg1);
bool need_cleanup = state->super_version->Unref(); if (state->super_version->Unref()) {
if (need_cleanup) {
DBImpl::DeletionState deletion_state; DBImpl::DeletionState deletion_state;
state->mu->Lock(); state->mu->Lock();
@ -3318,10 +3317,6 @@ void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
cfd->InstallSuperVersion(new_superversion, &mutex_); cfd->InstallSuperVersion(new_superversion, &mutex_);
deletion_state.new_superversion = nullptr; deletion_state.new_superversion = nullptr;
deletion_state.superversions_to_free.push_back(old_superversion); deletion_state.superversions_to_free.push_back(old_superversion);
// Reset SuperVersions cached in thread local storage
if (options_.allow_thread_local) {
cfd->ResetThreadLocalSuperVersions();
}
} }
Status DBImpl::GetImpl(const ReadOptions& options, Status DBImpl::GetImpl(const ReadOptions& options,
@ -3342,47 +3337,9 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// Acquire SuperVersion // Acquire SuperVersion
SuperVersion* sv = nullptr; SuperVersion* sv = nullptr;
ThreadLocalPtr* thread_local_sv = nullptr; // TODO(ljin): consider using GetReferencedSuperVersion() directly
if (LIKELY(options_.allow_thread_local)) { if (LIKELY(options_.allow_thread_local)) {
// The SuperVersion is cached in thread local storage to avoid acquiring sv = cfd->GetThreadLocalSuperVersion(&mutex_);
// mutex when SuperVersion does not change since the last use. When a new
// SuperVersion is installed, the compaction or flush thread cleans up
// cached SuperVersion in all existing thread local storage. To avoid
// acquiring mutex for this operation, we use atomic Swap() on the thread
// local pointer to guarantee exclusive access. If the thread local pointer
// is being used while a new SuperVersion is installed, the cached
// SuperVersion can become stale. In that case, the background thread would
// have swapped in kSVObsolete. We re-check the value at the end of
// Get, with an atomic compare and swap. The superversion will be released
// if detected to be stale.
thread_local_sv = cfd->GetThreadLocalSuperVersion();
void* ptr = thread_local_sv->Swap(SuperVersion::kSVInUse);
// Invariant:
// (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
// (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
// should only keep kSVInUse during a GetImpl.
assert(ptr != SuperVersion::kSVInUse);
sv = static_cast<SuperVersion*>(ptr);
if (sv == SuperVersion::kSVObsolete ||
sv->version_number != cfd->GetSuperVersionNumber()) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_ACQUIRES);
SuperVersion* sv_to_delete = nullptr;
if (sv && sv->Unref()) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_CLEANUPS);
mutex_.Lock();
// TODO underlying resources held by superversion (sst files) might
// not be released until the next background job.
sv->Cleanup();
sv_to_delete = sv;
} else {
mutex_.Lock();
}
sv = cfd->GetSuperVersion()->Ref();
mutex_.Unlock();
delete sv_to_delete;
}
} else { } else {
mutex_.Lock(); mutex_.Lock();
sv = cfd->GetSuperVersion()->Ref(); sv = cfd->GetSuperVersion()->Ref();
@ -3429,19 +3386,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
bool unref_sv = true; bool unref_sv = true;
if (LIKELY(options_.allow_thread_local)) { if (LIKELY(options_.allow_thread_local)) {
// Put the SuperVersion back unref_sv = !cfd->ReturnThreadLocalSuperVersion(sv);
void* expected = SuperVersion::kSVInUse;
if (thread_local_sv->CompareAndSwap(static_cast<void*>(sv), expected)) {
// When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
// storage has not been altered and no Scrape has happend. The
// SuperVersion is still current.
unref_sv = false;
} else {
// ThreadLocal scrape happened in the process of this GetImpl call (after
// thread local Swap() at the beginning and before CompareAndSwap()).
// This means the SuperVersion it holds is obsolete.
assert(expected == SuperVersion::kSVObsolete);
}
} }
if (unref_sv) { if (unref_sv) {
@ -3678,22 +3623,18 @@ bool DBImpl::KeyMayExist(const ReadOptions& options,
Iterator* DBImpl::NewIterator(const ReadOptions& options, Iterator* DBImpl::NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) { ColumnFamilyHandle* column_family) {
SequenceNumber latest_snapshot = 0;
SuperVersion* super_version = nullptr;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd(); auto cfd = cfh->cfd();
if (!options.tailing) {
mutex_.Lock();
super_version = cfd->GetSuperVersion()->Ref();
latest_snapshot = versions_->LastSequence();
mutex_.Unlock();
}
Iterator* iter; Iterator* iter;
if (options.tailing) { if (options.tailing) {
iter = new TailingIterator(this, options, cfd); iter = new TailingIterator(this, options, cfd);
} else { } else {
iter = NewInternalIterator(options, cfd, super_version); SequenceNumber latest_snapshot = versions_->LastSequence();
SuperVersion* sv = nullptr;
sv = cfd->GetReferencedSuperVersion(&mutex_);
iter = NewInternalIterator(options, cfd, sv);
auto snapshot = auto snapshot =
options.snapshot != nullptr options.snapshot != nullptr

Loading…
Cancel
Save