From ad0c3747cb7b3c853e8533ca7a43de17e670558c Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Thu, 27 Feb 2014 11:38:55 -0800 Subject: [PATCH] cache SuperVersion in thread local storage to avoid mutex lock Summary: as title Test Plan: asan_check will post results later Reviewers: haobo, igor, dhruba, sdong Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D16257 --- db/db_impl.cc | 146 +++++++++++++++++++++++++++-------- db/db_impl.h | 31 +++++++- include/rocksdb/options.h | 4 + include/rocksdb/statistics.h | 5 +- port/likely.h | 21 +++++ util/options.cc | 3 +- util/statistics.h | 2 +- util/thread_local.cc | 6 +- 8 files changed, 173 insertions(+), 45 deletions(-) create mode 100644 port/likely.h diff --git a/db/db_impl.cc b/db/db_impl.cc index b1ce96e1b..d7ae6e970 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -38,6 +38,7 @@ #include "db/version_set.h" #include "db/write_batch_internal.h" #include "port/port.h" +#include "port/likely.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -270,6 +271,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) logfile_number_(0), super_version_(nullptr), super_version_number_(0), + local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)), tmp_batch_(), bg_compaction_scheduled_(0), bg_manual_only_(0), @@ -288,7 +290,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) delayed_writes_(0), storage_options_(options), bg_work_gate_closed_(false), - refitting_level_(false) { + refitting_level_(false), + opened_successfully_(false) { mem_->Ref(); env_->GetAbsolutePath(dbname, &db_absolute_path_); @@ -319,12 +322,11 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) } DBImpl::~DBImpl() { - autovector to_delete; - // Wait for background work to finish if (flush_on_destroy_ && mem_->GetFirstSequenceNumber() != 0) { FlushMemTable(FlushOptions()); } + mutex_.Lock(); shutting_down_.Release_Store(this); // Any non-nullptr value is ok while (bg_compaction_scheduled_ || @@ -332,6 +334,34 @@ DBImpl::~DBImpl() { bg_logstats_scheduled_) { bg_cv_.Wait(); } + mutex_.Unlock(); + + // Release SuperVersion reference kept in ThreadLocalPtr. + // This must be done outside of mutex_ since unref handler can lock mutex. + // It also needs to be done after FlushMemTable, which can trigger local_sv_ + // access. + delete local_sv_; + + mutex_.Lock(); + if (options_.allow_thread_local) { + // Clean up obsolete files due to SuperVersion release. + // (1) Need to delete to obsolete files before closing because RepairDB() + // scans all existing files in the file system and builds manifest file. + // Keeping obsolete files confuses the repair process. + // (2) Need to check if we Open()/Recover() the DB successfully before + // deleting because if VersionSet recover fails (may be due to corrupted + // manifest file), it is not able to identify live files correctly. As a + // result, all "live" files can get deleted by accident. However, corrupted + // manifest is recoverable by RepairDB(). + if (opened_successfully_) { + DeletionState deletion_state; + FindObsoleteFiles(deletion_state, true); + // manifest number starting from 2 + deletion_state.manifest_file_number = 1; + PurgeObsoleteFiles(deletion_state); + } + } + if (super_version_ != nullptr) { bool is_last_reference __attribute__((unused)); is_last_reference = super_version_->Unref(); @@ -349,6 +379,7 @@ DBImpl::~DBImpl() { delete mem_->Unref(); } + autovector to_delete; imm_.current()->Unref(&to_delete); for (MemTable* m: to_delete) { delete m; @@ -1286,6 +1317,10 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, if (s.ok()) { InstallSuperVersion(deletion_state); + // Reset SuperVersions cached in thread local storage + if (options_.allow_thread_local) { + ResetThreadLocalSuperVersions(&deletion_state); + } if (madeProgress) { *madeProgress = 1; } @@ -2811,26 +2846,21 @@ Status DBImpl::Get(const ReadOptions& options, // DeletionState gets created and destructed outside of the lock -- we // use this convinently to: // * malloc one SuperVersion() outside of the lock -- new_superversion -// * delete one SuperVersion() outside of the lock -- superversion_to_free +// * delete SuperVersion()s outside of the lock -- superversions_to_free // // However, if InstallSuperVersion() gets called twice with the same, // deletion_state, we can't reuse the SuperVersion() that got malloced because // first call already used it. In that rare case, we take a hit and create a -// new SuperVersion() inside of the mutex. We do similar thing -// for superversion_to_free +// new SuperVersion() inside of the mutex. void DBImpl::InstallSuperVersion(DeletionState& deletion_state) { + mutex_.AssertHeld(); // if new_superversion == nullptr, it means somebody already used it SuperVersion* new_superversion = (deletion_state.new_superversion != nullptr) ? deletion_state.new_superversion : new SuperVersion(); SuperVersion* old_superversion = InstallSuperVersion(new_superversion); deletion_state.new_superversion = nullptr; - if (deletion_state.superversion_to_free != nullptr) { - // somebody already put it there - delete old_superversion; - } else { - deletion_state.superversion_to_free = old_superversion; - } + deletion_state.superversions_to_free.push_back(old_superversion); } DBImpl::SuperVersion* DBImpl::InstallSuperVersion( @@ -2839,7 +2869,10 @@ DBImpl::SuperVersion* DBImpl::InstallSuperVersion( new_superversion->Init(mem_, imm_.current(), versions_->current()); SuperVersion* old_superversion = super_version_; super_version_ = new_superversion; + super_version_->db = this; ++super_version_number_; + super_version_->version_number = super_version_number_; + if (old_superversion != nullptr && old_superversion->Unref()) { old_superversion->Cleanup(); return old_superversion; // will let caller delete outside of mutex @@ -2847,6 +2880,20 @@ DBImpl::SuperVersion* DBImpl::InstallSuperVersion( return nullptr; } +void DBImpl::ResetThreadLocalSuperVersions(DeletionState* deletion_state) { + mutex_.AssertHeld(); + autovector sv_ptrs; + local_sv_->Scrape(&sv_ptrs); + for (auto ptr : sv_ptrs) { + assert(ptr); + auto sv = static_cast(ptr); + if (static_cast(ptr)->Unref()) { + sv->Cleanup(); + deletion_state->superversions_to_free.push_back(sv); + } + } +} + Status DBImpl::GetImpl(const ReadOptions& options, const Slice& key, std::string* value, @@ -2864,10 +2911,41 @@ Status DBImpl::GetImpl(const ReadOptions& options, snapshot = versions_->LastSequence(); } - // This can be replaced by using atomics and spinlock instead of big mutex - mutex_.Lock(); - SuperVersion* get_version = super_version_->Ref(); - mutex_.Unlock(); + // Acquire SuperVersion + SuperVersion* sv = nullptr; + if (LIKELY(options_.allow_thread_local)) { + // The SuperVersion is cached in thread local storage to avoid acquiring + // mutex when SuperVersion does not change since the last use. When a new + // SuperVersion is installed, the compaction or flush thread cleans up + // cached SuperVersion in all existing thread local storage. To avoid + // acquiring mutex for this operation, we use atomic Swap() on the thread + // local pointer to guarantee exclusive access. If the thread local pointer + // is being used while a new SuperVersion is installed, the cached + // SuperVersion can become stale. It will eventually get refreshed either + // on the next GetImpl() call or next SuperVersion installation. + sv = static_cast(local_sv_->Swap(nullptr)); + if (!sv || sv->version_number != + super_version_number_.load(std::memory_order_relaxed)) { + RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_UPDATES); + SuperVersion* sv_to_delete = nullptr; + + if (sv && sv->Unref()) { + mutex_.Lock(); + sv->Cleanup(); + sv_to_delete = sv; + } else { + mutex_.Lock(); + } + sv = super_version_->Ref(); + mutex_.Unlock(); + + delete sv_to_delete; + } + } else { + mutex_.Lock(); + sv = super_version_->Ref(); + mutex_.Unlock(); + } bool have_stat_update = false; Version::GetStats stats; @@ -2880,18 +2958,18 @@ Status DBImpl::GetImpl(const ReadOptions& options, // merge_operands will contain the sequence of merges in the latter case. LookupKey lkey(key, snapshot); BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer); - if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) { + if (sv->mem->Get(lkey, value, &s, merge_context, options_)) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); - } else if (get_version->imm->Get(lkey, value, &s, merge_context, options_)) { + } else if (sv->imm->Get(lkey, value, &s, merge_context, options_)) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); } else { StopWatchNano from_files_timer(env_, false); StartPerfTimer(&from_files_timer); - get_version->current->Get(options, lkey, value, &s, &merge_context, &stats, - options_, value_found); + sv->current->Get(options, lkey, value, &s, &merge_context, &stats, + options_, value_found); have_stat_update = true; BumpPerfTime(&perf_context.get_from_output_files_time, &from_files_timer); RecordTick(options_.statistics.get(), MEMTABLE_MISS); @@ -2900,31 +2978,32 @@ Status DBImpl::GetImpl(const ReadOptions& options, StopWatchNano post_process_timer(env_, false); StartPerfTimer(&post_process_timer); - bool delete_get_version = false; if (!options_.disable_seek_compaction && have_stat_update) { mutex_.Lock(); - if (get_version->current->UpdateStats(stats)) { + if (sv->current->UpdateStats(stats)) { MaybeScheduleFlushOrCompaction(); } - if (get_version->Unref()) { - get_version->Cleanup(); - delete_get_version = true; - } mutex_.Unlock(); + } + + // Release SuperVersion + if (LIKELY(options_.allow_thread_local)) { + // Put the SuperVersion back + local_sv_->Reset(static_cast(sv)); } else { - if (get_version->Unref()) { + bool delete_sv = false; + if (sv->Unref()) { mutex_.Lock(); - get_version->Cleanup(); + sv->Cleanup(); mutex_.Unlock(); - delete_get_version = true; + delete_sv = true; + } + if (delete_sv) { + delete sv; } - } - if (delete_get_version) { - delete get_version; } // Note, tickers are atomic now - no lock protection needed any more. - RecordTick(options_.statistics.get(), NUMBER_KEYS_READ); RecordTick(options_.statistics.get(), BYTES_READ, value->size()); BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer); @@ -3772,6 +3851,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { impl->mutex_.Unlock(); if (s.ok()) { + impl->opened_successfully_ = true; *dbptr = impl; } else { delete impl; diff --git a/db/db_impl.h b/db/db_impl.h index 374cb8d02..4eda76243 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -26,6 +26,7 @@ #include "rocksdb/transaction_log.h" #include "util/autovector.h" #include "util/stats_logger.h" +#include "util/thread_local.h" #include "db/internal_stats.h" namespace rocksdb { @@ -152,6 +153,9 @@ class DBImpl : public DB { // all memtables that we need to free through this vector. We then // delete all those memtables outside of mutex, during destruction autovector to_delete; + // Version number of the current SuperVersion + uint64_t version_number; + DBImpl* db; // should be called outside the mutex SuperVersion() = default; @@ -170,6 +174,16 @@ class DBImpl : public DB { Version* new_current); }; + static void SuperVersionUnrefHandle(void* ptr) { + DBImpl::SuperVersion* sv = static_cast(ptr); + if (sv->Unref()) { + sv->db->mutex_.Lock(); + sv->Cleanup(); + sv->db->mutex_.Unlock(); + delete sv; + } + } + // needed for CleanupIteratorState struct DeletionState { inline bool HaveSomethingToDelete() const { @@ -195,7 +209,7 @@ class DBImpl : public DB { // a list of memtables to be free autovector memtables_to_free; - SuperVersion* superversion_to_free; // if nullptr nothing to free + autovector superversions_to_free; SuperVersion* new_superversion; // if nullptr no new superversion @@ -207,7 +221,6 @@ class DBImpl : public DB { manifest_file_number = 0; log_number = 0; prev_log_number = 0; - superversion_to_free = nullptr; new_superversion = create_superversion ? new SuperVersion() : nullptr; } @@ -217,8 +230,10 @@ class DBImpl : public DB { for (auto m : memtables_to_free) { delete m; } - // free superversion. if nullptr, this will be noop - delete superversion_to_free; + // free superversions + for (auto s : superversions_to_free) { + delete s; + } // if new_superversion was not used, it will be non-nullptr and needs // to be freed here delete new_superversion; @@ -400,6 +415,9 @@ class DBImpl : public DB { // InstallSuperVersion(), i.e. incremented every time super_version_ // changes. std::atomic super_version_number_; + // Thread's local copy of SuperVersion pointer + // This needs to be destructed after mutex_ + ThreadLocalPtr* local_sv_; std::string host_name_; @@ -489,6 +507,9 @@ class DBImpl : public DB { // Guard against multiple concurrent refitting bool refitting_level_; + // Indicate DB was opened successfully + bool opened_successfully_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); @@ -515,6 +536,8 @@ class DBImpl : public DB { // deletion_state which can have new_superversion already allocated. void InstallSuperVersion(DeletionState& deletion_state); + void ResetThreadLocalSuperVersions(DeletionState* deletion_state); + virtual Status GetPropertiesOfAllTables(TablePropertiesCollection* props) override; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 5b11a2c79..ae88268c7 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -714,6 +714,10 @@ struct Options { // // Default: 0 (disabled) size_t max_successive_merges; + + // Allow RocksDB to use thread local storage to optimize performance. + // Default: true + bool allow_thread_local; }; // diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 24384e9ce..82cc7133f 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -122,6 +122,7 @@ enum Tickers { // Number of table's properties loaded directly from file, without creating // table reader object. NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, + NUMBER_SUPERVERSION_UPDATES, TICKER_ENUM_MAX }; @@ -176,7 +177,9 @@ const std::vector> TickersNameMap = { {COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"}, {COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"}, {NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, - "rocksdb.number.direct.load.table.properties"}, }; + "rocksdb.number.direct.load.table.properties"}, + {NUMBER_SUPERVERSION_UPDATES, "rocksdb.number.superversion_updates"}, +}; /** * Keep adding histogram's here. diff --git a/port/likely.h b/port/likely.h new file mode 100644 index 000000000..ede0df5a1 --- /dev/null +++ b/port/likely.h @@ -0,0 +1,21 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef PORT_LIKELY_H_ +#define PORT_LIKELY_H_ + +#if defined(__GNUC__) && __GNUC__ >= 4 +#define LIKELY(x) (__builtin_expect((x), 1)) +#define UNLIKELY(x) (__builtin_expect((x), 0)) +#else +#define LIKELY(x) (x) +#define UNLIKELY(x) (x) +#endif + +#endif // PORT_LIKELY_H_ diff --git a/util/options.cc b/util/options.cc index e08cdf272..bfafe81de 100644 --- a/util/options.cc +++ b/util/options.cc @@ -110,7 +110,8 @@ Options::Options() inplace_callback(nullptr), memtable_prefix_bloom_bits(0), memtable_prefix_bloom_probes(6), - max_successive_merges(0) { + max_successive_merges(0), + allow_thread_local(true) { assert(memtable_factory.get() != nullptr); } diff --git a/util/statistics.h b/util/statistics.h index d8cb36e0a..d57a1dd4b 100644 --- a/util/statistics.h +++ b/util/statistics.h @@ -7,11 +7,11 @@ #include "rocksdb/statistics.h" #include "util/histogram.h" #include "util/mutexlock.h" +#include "port/likely.h" #include #include -#define UNLIKELY(val) (__builtin_expect((val), 0)) namespace rocksdb { diff --git a/util/thread_local.cc b/util/thread_local.cc index 90571b97e..2e5d3618b 100644 --- a/util/thread_local.cc +++ b/util/thread_local.cc @@ -9,12 +9,8 @@ #include "util/thread_local.h" #include "util/mutexlock.h" +#include "port/likely.h" -#if defined(__GNUC__) && __GNUC__ >= 4 -#define UNLIKELY(x) (__builtin_expect((x), 0)) -#else -#define UNLIKELY(x) (x) -#endif namespace rocksdb {