diff --git a/db/column_family.cc b/db/column_family.cc index d3ff9b3f5..ea3e617e2 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -66,7 +66,7 @@ uint64_t SlowdownAmount(int n, double bottom, double top) { } // namespace ColumnFamilyHandleImpl::ColumnFamilyHandleImpl( - ColumnFamilyData* column_family_data, DBImpl* db, port::Mutex* mutex) + ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex) : cfd_(column_family_data), db_(db), mutex_(mutex) { if (cfd_ != nullptr) { cfd_->Ref(); @@ -482,7 +482,7 @@ Compaction* ColumnFamilyData::CompactRange( } SuperVersion* ColumnFamilyData::GetReferencedSuperVersion( - port::Mutex* db_mutex) { + InstrumentedMutex* db_mutex) { SuperVersion* sv = nullptr; sv = GetThreadLocalSuperVersion(db_mutex); sv->Ref(); @@ -493,7 +493,7 @@ SuperVersion* ColumnFamilyData::GetReferencedSuperVersion( } SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion( - port::Mutex* db_mutex) { + InstrumentedMutex* db_mutex) { SuperVersion* sv = nullptr; // The SuperVersion is cached in thread local storage to avoid acquiring // mutex when SuperVersion does not change since the last use. When a new @@ -599,13 +599,13 @@ void ColumnFamilyData::NotifyOnFlushCompleted( } SuperVersion* ColumnFamilyData::InstallSuperVersion( - SuperVersion* new_superversion, port::Mutex* db_mutex) { + SuperVersion* new_superversion, InstrumentedMutex* db_mutex) { db_mutex->AssertHeld(); return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_); } SuperVersion* ColumnFamilyData::InstallSuperVersion( - SuperVersion* new_superversion, port::Mutex* db_mutex, + SuperVersion* new_superversion, InstrumentedMutex* db_mutex, const MutableCFOptions& mutable_cf_options) { new_superversion->db_mutex = db_mutex; new_superversion->mutable_cf_options = mutable_cf_options; diff --git a/db/column_family.h b/db/column_family.h index 8101e7032..84b01dc71 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -21,9 +21,10 @@ #include "db/write_batch_internal.h" #include "db/write_controller.h" #include "db/table_cache.h" -#include "util/thread_local.h" #include "db/flush_scheduler.h" +#include "util/instrumented_mutex.h" #include "util/mutable_cf_options.h" +#include "util/thread_local.h" namespace rocksdb { @@ -38,6 +39,8 @@ class InternalStats; class ColumnFamilyData; class DBImpl; class LogBuffer; +class InstrumentedMutex; +class InstrumentedMutexLock; // ColumnFamilyHandleImpl is the class that clients use to access different // column families. It has non-trivial destructor, which gets called when client @@ -45,7 +48,8 @@ class LogBuffer; class ColumnFamilyHandleImpl : public ColumnFamilyHandle { public: // create while holding the mutex - ColumnFamilyHandleImpl(ColumnFamilyData* cfd, DBImpl* db, port::Mutex* mutex); + ColumnFamilyHandleImpl( + ColumnFamilyData* cfd, DBImpl* db, InstrumentedMutex* mutex); // destroy without mutex virtual ~ColumnFamilyHandleImpl(); virtual ColumnFamilyData* cfd() const { return cfd_; } @@ -57,7 +61,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle { private: ColumnFamilyData* cfd_; DBImpl* db_; - port::Mutex* mutex_; + InstrumentedMutex* mutex_; }; // Does not ref-count ColumnFamilyData @@ -91,7 +95,7 @@ struct SuperVersion { autovector to_delete; // Version number of the current SuperVersion uint64_t version_number; - port::Mutex* db_mutex; + InstrumentedMutex* db_mutex; // should be called outside the mutex SuperVersion() = default; @@ -235,11 +239,11 @@ class ColumnFamilyData { SuperVersion* GetSuperVersion() { return super_version_; } // thread-safe // Return a already referenced SuperVersion to be used safely. - SuperVersion* GetReferencedSuperVersion(port::Mutex* db_mutex); + SuperVersion* GetReferencedSuperVersion(InstrumentedMutex* db_mutex); // thread-safe // Get SuperVersion stored in thread local storage. If it does not exist, // get a reference from a current SuperVersion. - SuperVersion* GetThreadLocalSuperVersion(port::Mutex* db_mutex); + SuperVersion* GetThreadLocalSuperVersion(InstrumentedMutex* db_mutex); // Try to return SuperVersion back to thread local storage. Retrun true on // success and false on failure. It fails when the thread local storage // contains anything other than SuperVersion::kSVInUse flag. @@ -254,10 +258,10 @@ class ColumnFamilyData { // the clients to allocate SuperVersion outside of mutex. // IMPORTANT: Only call this from DBImpl::InstallSuperVersion() SuperVersion* InstallSuperVersion(SuperVersion* new_superversion, - port::Mutex* db_mutex, + InstrumentedMutex* db_mutex, const MutableCFOptions& mutable_cf_options); SuperVersion* InstallSuperVersion(SuperVersion* new_superversion, - port::Mutex* db_mutex); + InstrumentedMutex* db_mutex); void ResetThreadLocalSuperVersions(); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 09b21a237..775dcebec 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -470,7 +470,7 @@ Status CompactionJob::Run() { return status; } -void CompactionJob::Install(Status* status, port::Mutex* db_mutex) { +void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) { db_mutex->AssertHeld(); ColumnFamilyData* cfd = compact_->compaction->column_family_data(); cfd->internal_stats()->AddCompactionStats( @@ -955,7 +955,7 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { return s; } -Status CompactionJob::InstallCompactionResults(port::Mutex* db_mutex) { +Status CompactionJob::InstallCompactionResults(InstrumentedMutex* db_mutex) { db_mutex->AssertHeld(); // paranoia: verify that the files that we started with diff --git a/db/compaction_job.h b/db/compaction_job.h index 705ba7c64..cc31ece87 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -75,7 +75,7 @@ class CompactionJob { Status Run(); // REQUIRED: mutex held // status is the return of Run() - void Install(Status* status, port::Mutex* db_mutex); + void Install(Status* status, InstrumentedMutex* db_mutex); private: void AllocateCompactionOutputFileNumbers(); @@ -86,7 +86,7 @@ class CompactionJob { // Call compaction_filter_v2->Filter() on kv-pairs in compact void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2); Status FinishCompactionOutputFile(Iterator* input); - Status InstallCompactionResults(port::Mutex* db_mutex); + Status InstallCompactionResults(InstrumentedMutex* db_mutex); SequenceNumber findEarliestVisibleSnapshot( SequenceNumber in, const std::vector& snapshots, SequenceNumber* prev_snapshot); diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 54217cc37..2a089dc57 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -130,7 +130,7 @@ class CompactionJobTest { ColumnFamilyOptions cf_options_; WriteBuffer write_buffer_; std::unique_ptr versions_; - port::Mutex mutex_; + InstrumentedMutex mutex_; std::atomic shutting_down_; std::shared_ptr mock_table_factory_; }; diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index ce009a976..4011b4652 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -31,7 +31,7 @@ namespace rocksdb { Status DBImpl::DisableFileDeletions() { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); ++disable_delete_obsolete_files_; if (disable_delete_obsolete_files_ == 1) { Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, @@ -48,7 +48,7 @@ Status DBImpl::EnableFileDeletions(bool force) { JobContext job_context; bool should_purge_files = false; { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); if (force) { // if force, we need to enable file deletions right away disable_delete_obsolete_files_ = 0; diff --git a/db/db_impl.cc b/db/db_impl.cc index 8e8f3b733..dd627313b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -197,7 +197,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) db_options_(SanitizeOptions(dbname, options)), stats_(db_options_.statistics.get()), db_lock_(nullptr), - mutex_(options.use_adaptive_mutex), + mutex_(stats_, env_, + DB_MUTEX_WAIT_MICROS, + options.use_adaptive_mutex), shutting_down_(false), bg_cv_(&mutex_), logfile_number_(0), @@ -411,7 +413,7 @@ void DBImpl::MaybeDumpStats() { GetPropertyType("rocksdb.dbstats", &tmp1, &tmp2); std::string stats; { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); for (auto cfd : *versions_->GetColumnFamilySet()) { cfd->internal_stats()->GetStringProperty(cf_property_type, "rocksdb.cfstats", &stats); @@ -1225,7 +1227,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, int max_level_with_files = 0; { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); Version* base = cfd->current(); for (int level = 1; level < cfd->NumberLevels(); level++) { if (base->storage_info()->OverlapInLevel(level, begin, end)) { @@ -1258,7 +1260,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, LogFlush(db_options_.info_log); { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); // an automatic compaction that has been scheduled might have been // preempted by the manual compactions. Need to schedule it back. MaybeScheduleFlushOrCompaction(); @@ -1276,7 +1278,7 @@ Status DBImpl::CompactFiles( // not supported in lite version return Status::NotSupported("Not supported in ROCKSDB LITE"); #else - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); if (column_family == nullptr) { return Status::InvalidArgument("ColumnFamilyHandle must be non-null."); } @@ -1471,7 +1473,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, MutableCFOptions new_options; Status s; { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); s = cfd->SetOptions(options_map); if (s.ok()) { new_options = *cfd->GetLatestMutableCFOptions(); @@ -1607,14 +1609,14 @@ int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) { int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* column_family) { auto cfh = reinterpret_cast(column_family); - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); return cfh->cfd()->GetSuperVersion()-> mutable_cf_options.max_mem_compaction_level; } int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { auto cfh = reinterpret_cast(column_family); - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); return cfh->cfd()->GetSuperVersion()-> mutable_cf_options.level0_stop_writes_trigger; } @@ -1662,7 +1664,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, manual.end = &end_storage; } - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); // When a manual compaction arrives, temporarily disable scheduling of // non-manual compactions and wait until the number of scheduled compaction @@ -1717,7 +1719,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, Status s; { WriteContext context; - MutexLock guard_lock(&mutex_); + InstrumentedMutexLock guard_lock(&mutex_); if (cfd->imm()->size() == 0 && cfd->mem()->IsEmpty()) { // Nothing to flush @@ -1750,7 +1752,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) { Status s; // Wait until the compaction completes - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); while (cfd->imm()->size() > 0 && bg_error_.ok()) { bg_cv_.Wait(); } @@ -1917,7 +1919,7 @@ void DBImpl::BackgroundCallFlush() { LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); auto pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); @@ -1985,7 +1987,7 @@ void DBImpl::BackgroundCallCompaction() { MaybeDumpStats(); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); auto pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); @@ -2352,11 +2354,11 @@ uint64_t DBImpl::CallFlushDuringCompaction( namespace { struct IterState { - IterState(DBImpl* _db, port::Mutex* _mu, SuperVersion* _super_version) + IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version) : db(_db), mu(_mu), super_version(_super_version) {} DBImpl* db; - port::Mutex* mu; + InstrumentedMutex* mu; SuperVersion* super_version; }; @@ -2643,7 +2645,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, Status s; *handle = nullptr; { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) != nullptr) { @@ -2691,7 +2693,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, "Creating column family [%s] FAILED -- %s", column_family_name.c_str(), s.ToString().c_str()); } - } // MutexLock l(&mutex_) + } // InstrumentedMutexLock l(&mutex_) // this is outside the mutex if (s.ok()) { @@ -2716,7 +2718,7 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { Status s; { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); if (cfd->IsDropped()) { s = Status::InvalidArgument("Column family already dropped!\n"); } @@ -2919,14 +2921,14 @@ const Snapshot* DBImpl::GetSnapshot() { int64_t unix_time = 0; env_->GetCurrentTime(&unix_time); // Ignore error - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); // returns null if the underlying memtable does not support snapshot. if (!is_snapshot_supported_) return nullptr; return snapshots_.New(versions_->LastSequence(), unix_time); } void DBImpl::ReleaseSnapshot(const Snapshot* s) { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); snapshots_.Delete(reinterpret_cast(s)); } @@ -3377,7 +3379,7 @@ bool DBImpl::GetProperty(ColumnFamilyHandle* column_family, } else { auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); return cfd->internal_stats()->GetStringProperty(property_type, property, value); } @@ -3403,7 +3405,7 @@ bool DBImpl::GetIntPropertyInternal(ColumnFamilyHandle* column_family, auto cfd = cfh->cfd(); if (!need_out_of_mutex) { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); return cfd->internal_stats()->GetIntProperty(property_type, value, this); } else { SuperVersion* sv = GetAndRefSuperVersion(cfd); @@ -3430,7 +3432,7 @@ void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd, // Release SuperVersion if (sv->Unref()) { { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); sv->Cleanup(); } delete sv; @@ -3447,7 +3449,7 @@ void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family, auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); v = cfd->current(); v->Ref(); } @@ -3462,7 +3464,7 @@ void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family, } { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); v->Unref(); } } @@ -3530,7 +3532,7 @@ Status DBImpl::DeleteFile(std::string name) { VersionEdit edit; JobContext job_context(true); { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd); if (!status.ok()) { Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, @@ -3589,7 +3591,7 @@ Status DBImpl::DeleteFile(std::string name) { } void DBImpl::GetLiveFilesMetaData(std::vector* metadata) { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); versions_->GetLiveFilesMetaData(metadata); } diff --git a/db/db_impl.h b/db/db_impl.h index 3b3376665..86402e817 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -36,6 +36,7 @@ #include "util/thread_local.h" #include "util/scoped_arena_iterator.h" #include "util/hash.h" +#include "util/instrumented_mutex.h" #include "db/internal_stats.h" #include "db/write_controller.h" #include "db/flush_scheduler.h" @@ -412,7 +413,7 @@ class DBImpl : public DB { FileLock* db_lock_; // State below is protected by mutex_ - port::Mutex mutex_; + InstrumentedMutex mutex_; std::atomic shutting_down_; // This condition variable is signaled on these conditions: // * whenever bg_compaction_scheduled_ goes down to 0 @@ -422,7 +423,7 @@ class DBImpl : public DB { // * whenever bg_flush_scheduled_ value decreases (i.e. whenever a flush is // done, even if it didn't make any progress) // * whenever there is an error in background flush or compaction - port::CondVar bg_cv_; + InstrumentedCondVar bg_cv_; uint64_t logfile_number_; unique_ptr log_; bool log_dir_synced_; diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index db4c91ae5..efa209a2b 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -15,7 +15,7 @@ namespace rocksdb { uint64_t DBImpl::TEST_GetLevel0TotalSize() { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0); } @@ -45,7 +45,7 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( auto cfh = reinterpret_cast(column_family); cfd = cfh->cfd(); } - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); return cfd->current()->storage_info()->MaxNextLevelOverlappingBytes(); } @@ -54,7 +54,7 @@ void DBImpl::TEST_GetFilesMetaData( std::vector>* metadata) { auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); metadata->resize(NumberLevels()); for (int level = 0; level < NumberLevels(); level++) { const std::vector& files = @@ -113,7 +113,7 @@ Status DBImpl::TEST_WaitForCompact() { // wait for compact. It actually waits for scheduled compaction // OR flush to finish. - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); while ((bg_compaction_scheduled_ || bg_flush_scheduled_) && bg_error_.ok()) { bg_cv_.Wait(); } diff --git a/db/db_test.cc b/db/db_test.cc index 4915d29c1..66e47c680 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -10298,6 +10298,21 @@ TEST(DBTest, EncodeDecompressedBlockSizeTest) { } } +TEST(DBTest, MutexWaitStats) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + CreateAndReopenWithCF({"pikachu"}, options); + const int64_t kMutexWaitDelay = 100; + ThreadStatusUtil::TEST_SetStateDelay( + ThreadStatus::STATE_MUTEX_WAIT, kMutexWaitDelay); + ASSERT_OK(Put("hello", "rocksdb")); + ASSERT_GE(TestGetTickerCount( + options, DB_MUTEX_WAIT_MICROS), kMutexWaitDelay); + ThreadStatusUtil::TEST_SetStateDelay( + ThreadStatus::STATE_MUTEX_WAIT, 0); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/flush_job.cc b/db/flush_job.cc index 8cf4daa49..ca1d113db 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -55,7 +55,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, const DBOptions& db_options, const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, VersionSet* versions, - port::Mutex* db_mutex, std::atomic* shutting_down, + InstrumentedMutex* db_mutex, + std::atomic* shutting_down, SequenceNumber newest_snapshot, JobContext* job_context, LogBuffer* log_buffer, Directory* db_directory, Directory* output_file_directory, diff --git a/db/flush_job.h b/db/flush_job.h index 0b8491484..40cdc5045 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -28,6 +28,7 @@ #include "rocksdb/memtablerep.h" #include "rocksdb/transaction_log.h" #include "util/autovector.h" +#include "util/instrumented_mutex.h" #include "util/stop_watch.h" #include "util/thread_local.h" #include "util/scoped_arena_iterator.h" @@ -54,7 +55,7 @@ class FlushJob { const DBOptions& db_options, const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, VersionSet* versions, - port::Mutex* db_mutex, std::atomic* shutting_down, + InstrumentedMutex* db_mutex, std::atomic* shutting_down, SequenceNumber newest_snapshot, JobContext* job_context, LogBuffer* log_buffer, Directory* db_directory, Directory* output_file_directory, CompressionType output_compression, @@ -72,7 +73,7 @@ class FlushJob { const MutableCFOptions& mutable_cf_options_; const EnvOptions& env_options_; VersionSet* versions_; - port::Mutex* db_mutex_; + InstrumentedMutex* db_mutex_; std::atomic* shutting_down_; SequenceNumber newest_snapshot_; JobContext* job_context_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 2f4f08b2e..d3e824087 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -75,7 +75,7 @@ class FlushJobTest { WriteBuffer write_buffer_; ColumnFamilyOptions cf_options_; std::unique_ptr versions_; - port::Mutex mutex_; + InstrumentedMutex mutex_; std::atomic shutting_down_; std::shared_ptr mock_table_factory_; }; diff --git a/db/log_and_apply_bench.cc b/db/log_and_apply_bench.cc index b55ec0539..e5e271a1f 100644 --- a/db/log_and_apply_bench.cc +++ b/db/log_and_apply_bench.cc @@ -31,8 +31,8 @@ void BM_LogAndApply(int iters, int num_base_files) { WriteController wc; ColumnFamilyData* default_cfd; uint64_t fnum = 1; - port::Mutex mu; - MutexLock l(&mu); + InstrumentedMutex mu; + InstrumentedMutexLock l(&mu); BENCHMARK_SUSPEND { std::string dbname = test::TmpDir() + "/rocksdb_test_benchmark"; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 8d568e895..44c069dd5 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -164,7 +164,7 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, // Record a successful flush in the manifest file Status MemTableList::InstallMemtableFlushResults( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - const autovector& mems, VersionSet* vset, port::Mutex* mu, + const autovector& mems, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer) { mu->AssertHeld(); diff --git a/db/memtable_list.h b/db/memtable_list.h index 6cf1737c1..30382eac6 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -22,13 +22,14 @@ #include "rocksdb/iterator.h" #include "rocksdb/options.h" #include "util/autovector.h" +#include "util/instrumented_mutex.h" #include "util/log_buffer.h" namespace rocksdb { class ColumnFamilyData; class InternalKeyComparator; -class Mutex; +class InstrumentedMutex; class MergeIteratorBuilder; // keeps a list of immutable memtables in a vector. the list is immutable @@ -113,7 +114,7 @@ class MemTableList { // Commit a successful flush in the manifest file Status InstallMemtableFlushResults( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - const autovector& m, VersionSet* vset, port::Mutex* mu, + const autovector& m, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer); diff --git a/db/version_set.cc b/db/version_set.cc index 3b6d57f55..211ee3fda 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1487,11 +1487,11 @@ std::string Version::DebugString(bool hex) const { struct VersionSet::ManifestWriter { Status status; bool done; - port::CondVar cv; + InstrumentedCondVar cv; ColumnFamilyData* cfd; VersionEdit* edit; - explicit ManifestWriter(port::Mutex* mu, ColumnFamilyData* _cfd, + explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd, VersionEdit* e) : done(false), cv(mu), cfd(_cfd), edit(e) {} }; @@ -1556,7 +1556,7 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, const MutableCFOptions& mutable_cf_options, - VersionEdit* edit, port::Mutex* mu, + VersionEdit* edit, InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options) { mu->AssertHeld(); @@ -1824,7 +1824,7 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* builder, Version* v, - VersionEdit* edit, port::Mutex* mu) { + VersionEdit* edit, InstrumentedMutex* mu) { mu->AssertHeld(); assert(!edit->IsColumnFamilyManipulation()); @@ -2275,8 +2275,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, MutableCFOptions mutable_cf_options(*options, ImmutableCFOptions(*options)); VersionEdit ve; - port::Mutex dummy_mutex; - MutexLock l(&dummy_mutex); + InstrumentedMutex dummy_mutex; + InstrumentedMutexLock l(&dummy_mutex); return versions.LogAndApply( versions.GetColumnFamilySet()->GetDefault(), mutable_cf_options, &ve, &dummy_mutex, nullptr, true); diff --git a/db/version_set.h b/db/version_set.h index 83801e1da..ca79aff4e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -36,6 +36,7 @@ #include "db/log_reader.h" #include "db/file_indexer.h" #include "db/write_controller.h" +#include "util/instrumented_mutex.h" namespace rocksdb { @@ -485,7 +486,7 @@ class VersionSet { Status LogAndApply( ColumnFamilyData* column_family_data, const MutableCFOptions& mutable_cf_options, VersionEdit* edit, - port::Mutex* mu, Directory* db_directory = nullptr, + InstrumentedMutex* mu, Directory* db_directory = nullptr, bool new_descriptor_log = false, const ColumnFamilyOptions* column_family_options = nullptr); @@ -656,7 +657,7 @@ class VersionSet { void LogAndApplyCFHelper(VersionEdit* edit); void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, Version* v, - VersionEdit* edit, port::Mutex* mu); + VersionEdit* edit, InstrumentedMutex* mu); }; } // namespace rocksdb diff --git a/db/write_thread.h b/db/write_thread.h index 8c5baa664..db3520244 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -12,6 +12,7 @@ #include "db/write_batch_internal.h" #include "util/autovector.h" #include "port/port.h" +#include "util/instrumented_mutex.h" namespace rocksdb { @@ -27,9 +28,9 @@ class WriteThread { bool in_batch_group; bool done; uint64_t timeout_hint_us; - port::CondVar cv; + InstrumentedCondVar cv; - explicit Writer(port::Mutex* mu) + explicit Writer(InstrumentedMutex* mu) : batch(nullptr), sync(false), disableWAL(false), diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 7d0dad5d6..4b28fd0d9 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -81,6 +81,8 @@ enum Tickers : uint32_t { STALL_L0_NUM_FILES_MICROS, // Writer has to wait for compaction or flush to finish. STALL_MICROS, + // The wait time for db mutex. + DB_MUTEX_WAIT_MICROS, RATE_LIMIT_DELAY_MILLIS, NO_ITERATORS, // number of iterators currently open @@ -163,6 +165,7 @@ const std::vector> TickersNameMap = { {STALL_MEMTABLE_COMPACTION_MICROS, "rocksdb.memtable.compaction.micros"}, {STALL_L0_NUM_FILES_MICROS, "rocksdb.l0.num.files.stall.micros"}, {STALL_MICROS, "rocksdb.stall.micros"}, + {DB_MUTEX_WAIT_MICROS, "rocksdb.db.mutex.wait.micros"}, {RATE_LIMIT_DELAY_MILLIS, "rocksdb.rate.limit.delay.millis"}, {NO_ITERATORS, "rocksdb.num.iterators"}, {NUMBER_MULTIGET_CALLS, "rocksdb.number.multiget.get"}, diff --git a/include/rocksdb/thread_status.h b/include/rocksdb/thread_status.h index 539321291..36efd6f75 100644 --- a/include/rocksdb/thread_status.h +++ b/include/rocksdb/thread_status.h @@ -53,6 +53,7 @@ struct ThreadStatus { // such as reading / writing a file or waiting for a mutex. enum StateType : int { STATE_UNKNOWN = 0, + STATE_MUTEX_WAIT = 1, NUM_STATE_TYPES }; diff --git a/util/instrumented_mutex.cc b/util/instrumented_mutex.cc new file mode 100644 index 000000000..05d19b2ae --- /dev/null +++ b/util/instrumented_mutex.cc @@ -0,0 +1,72 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "util/instrumented_mutex.h" +#include "util/thread_status_util.h" + +namespace rocksdb { +void InstrumentedMutex::Lock() { + uint64_t wait_time_micros = 0; + if (env_ != nullptr && stats_ != nullptr) { + { + StopWatch sw(env_, nullptr, 0, &wait_time_micros); + LockInternal(); + } + RecordTick(stats_, stats_code_, wait_time_micros); + } else { + LockInternal(); + } +} + +void InstrumentedMutex::LockInternal() { +#ifndef NDEBUG + ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT); +#endif + mutex_.Lock(); +} + +void InstrumentedCondVar::Wait() { + uint64_t wait_time_micros = 0; + if (env_ != nullptr && stats_ != nullptr) { + { + StopWatch sw(env_, nullptr, 0, &wait_time_micros); + WaitInternal(); + } + RecordTick(stats_, stats_code_, wait_time_micros); + } else { + WaitInternal(); + } +} + +void InstrumentedCondVar::WaitInternal() { +#ifndef NDEBUG + ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT); +#endif + cond_.Wait(); +} + +bool InstrumentedCondVar::TimedWait(uint64_t abs_time_us) { + uint64_t wait_time_micros = 0; + bool result = false; + if (env_ != nullptr && stats_ != nullptr) { + { + StopWatch sw(env_, nullptr, 0, &wait_time_micros); + result = TimedWaitInternal(abs_time_us); + } + RecordTick(stats_, stats_code_, wait_time_micros); + } else { + result = TimedWaitInternal(abs_time_us); + } + return result; +} + +bool InstrumentedCondVar::TimedWaitInternal(uint64_t abs_time_us) { +#ifndef NDEBUG + ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT); +#endif + return cond_.TimedWait(abs_time_us); +} + +} // namespace rocksdb diff --git a/util/instrumented_mutex.h b/util/instrumented_mutex.h new file mode 100644 index 000000000..3f233494a --- /dev/null +++ b/util/instrumented_mutex.h @@ -0,0 +1,98 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include "port/port.h" +#include "rocksdb/env.h" +#include "rocksdb/statistics.h" +#include "rocksdb/thread_status.h" +#include "util/statistics.h" +#include "util/stop_watch.h" + +namespace rocksdb { +class InstrumentedCondVar; + +// A wrapper class for port::Mutex that provides additional layer +// for collecting stats and instrumentation. +class InstrumentedMutex { + public: + explicit InstrumentedMutex(bool adaptive = false) + : mutex_(adaptive), stats_(nullptr), env_(nullptr), + stats_code_(0) {} + + InstrumentedMutex( + Statistics* stats, Env* env, + int stats_code, bool adaptive = false) + : mutex_(adaptive), stats_(stats), env_(env), + stats_code_(stats_code) {} + + void Lock(); + + void Unlock() { + mutex_.Unlock(); + } + + void AssertHeld() { + mutex_.AssertHeld(); + } + + private: + void LockInternal(); + friend class InstrumentedCondVar; + port::Mutex mutex_; + Statistics* stats_; + Env* env_; + int stats_code_; +}; + +// A wrapper class for port::Mutex that provides additional layer +// for collecting stats and instrumentation. +class InstrumentedMutexLock { + public: + explicit InstrumentedMutexLock(InstrumentedMutex* mutex) : mutex_(mutex) { + mutex_->Lock(); + } + + ~InstrumentedMutexLock() { + mutex_->Unlock(); + } + + private: + InstrumentedMutex* const mutex_; + InstrumentedMutexLock(const InstrumentedMutexLock&) = delete; + void operator=(const InstrumentedMutexLock&) = delete; +}; + +class InstrumentedCondVar { + public: + explicit InstrumentedCondVar(InstrumentedMutex* instrumented_mutex) + : cond_(&(instrumented_mutex->mutex_)), + stats_(instrumented_mutex->stats_), + env_(instrumented_mutex->env_), + stats_code_(instrumented_mutex->stats_code_) {} + + void Wait(); + + bool TimedWait(uint64_t abs_time_us); + + void Signal() { + cond_.Signal(); + } + + void SignalAll() { + cond_.SignalAll(); + } + + private: + void WaitInternal(); + bool TimedWaitInternal(uint64_t abs_time_us); + port::CondVar cond_; + Statistics* stats_; + Env* env_; + int stats_code_; +}; + +} // namespace rocksdb diff --git a/util/thread_status_util.h b/util/thread_status_util.h index a8549e8ae..8428d492c 100644 --- a/util/thread_status_util.h +++ b/util/thread_status_util.h @@ -11,6 +11,7 @@ #include "util/thread_status_updater.h" namespace rocksdb { +class ColumnFamilyData; // The static utility class for updating thread-local status.