diff --git a/db/db_impl.cc b/db/db_impl.cc index 45f2c4ea4..179455f35 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -245,8 +245,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) unscheduled_flushes_(0), unscheduled_compactions_(0), bg_compaction_scheduled_(0), + num_running_compactions_(0), bg_manual_only_(0), bg_flush_scheduled_(0), + num_running_flushes_(0), manual_compaction_(nullptr), disable_delete_obsolete_files_(0), delete_obsolete_files_next_run_( @@ -2404,6 +2406,7 @@ void DBImpl::BackgroundCallFlush() { LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); { InstrumentedMutexLock l(&mutex_); + num_running_flushes_++; auto pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); @@ -2449,6 +2452,8 @@ void DBImpl::BackgroundCallFlush() { mutex_.Lock(); } + assert(num_running_flushes_ > 0); + num_running_flushes_--; bg_flush_scheduled_--; // See if there's more work to be done MaybeScheduleFlushOrCompaction(); @@ -2469,6 +2474,7 @@ void DBImpl::BackgroundCallCompaction() { LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); { InstrumentedMutexLock l(&mutex_); + num_running_compactions_++; auto pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); @@ -2517,6 +2523,8 @@ void DBImpl::BackgroundCallCompaction() { mutex_.Lock(); } + assert(num_running_compactions_ > 0); + num_running_compactions_--; bg_compaction_scheduled_--; versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); diff --git a/db/db_impl.h b/db/db_impl.h index 25f159844..a896fad26 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -363,6 +363,20 @@ class DBImpl : public DB { // Same as above, should called without mutex held and not on write thread. ColumnFamilyHandle* GetColumnFamilyHandleUnlocked(uint32_t column_family_id); + // Returns the number of currently running flushes. + // REQUIREMENT: mutex_ must be held when calling this function. + int num_running_flushes() { + mutex_.AssertHeld(); + return num_running_flushes_; + } + + // Returns the number of currently running compactions. + // REQUIREMENT: mutex_ must be held when calling this function. + int num_running_compactions() { + mutex_.AssertHeld(); + return num_running_compactions_; + } + protected: Env* const env_; const std::string dbname_; @@ -687,6 +701,9 @@ class DBImpl : public DB { // count how many background compactions are running or have been scheduled int bg_compaction_scheduled_; + // stores the number of compactions are currently running + int num_running_compactions_; + // If non-zero, MaybeScheduleFlushOrCompaction() will only schedule manual // compactions (if manual_compaction_ is not null). This mechanism enables // manual compactions to wait until all other compactions are finished. @@ -695,6 +712,9 @@ class DBImpl : public DB { // number of background memtable flush jobs, submitted to the HIGH pool int bg_flush_scheduled_; + // stores the number of flushes are currently running + int num_running_flushes_; + // Information for a manual compaction struct ManualCompaction { ColumnFamilyData* cfd; diff --git a/db/db_test.cc b/db/db_test.cc index 690fdf812..abc33322d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -6941,6 +6941,10 @@ TEST_F(DBTest, ThreadStatusFlush) { ASSERT_EQ("v1", Get(1, "foo")); VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 0); + uint64_t num_running_flushes = 0; + db_->GetIntProperty(DB::Properties::kNumRunningFlushes, &num_running_flushes); + ASSERT_EQ(num_running_flushes, 0); + Put(1, "k1", std::string(100000, 'x')); // Fill memtable Put(1, "k2", std::string(100000, 'y')); // Trigger flush @@ -6948,10 +6952,11 @@ TEST_F(DBTest, ThreadStatusFlush) { // running when we perform VerifyOperationCount(). TEST_SYNC_POINT("DBTest::ThreadStatusFlush:1"); VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 1); + db_->GetIntProperty(DB::Properties::kNumRunningFlushes, &num_running_flushes); + ASSERT_EQ(num_running_flushes, 1); // This second sync point is to ensure the flush job will not // be completed until we already perform VerifyOperationCount(). TEST_SYNC_POINT("DBTest::ThreadStatusFlush:2"); - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } @@ -6996,6 +7001,10 @@ TEST_P(DBTestWithParam, ThreadStatusSingleCompaction) { } // This makes sure a compaction won't be scheduled until // we have done with the above Put Phase. + uint64_t num_running_compactions = 0; + db_->GetIntProperty(DB::Properties::kNumRunningCompactions, + &num_running_compactions); + ASSERT_EQ(num_running_compactions, 0); TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:0"); ASSERT_GE(NumTableFilesAtLevel(0), options.level0_file_num_compaction_trigger); @@ -7010,6 +7019,9 @@ TEST_P(DBTestWithParam, ThreadStatusSingleCompaction) { // If thread tracking is not enabled, compaction count should be 0. VerifyOperationCount(env_, ThreadStatus::OP_COMPACTION, 0); } + db_->GetIntProperty(DB::Properties::kNumRunningCompactions, + &num_running_compactions); + ASSERT_EQ(num_running_compactions, 1); // TODO(yhchiang): adding assert to verify each compaction stage. TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:2"); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 4e37c1d08..6bf19701e 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -130,6 +130,8 @@ static const std::string aggregated_table_properties = "aggregated-table-properties"; static const std::string aggregated_table_properties_at_level = aggregated_table_properties + "-at-level"; +static const std::string num_running_compactions = "num-running-compactions"; +static const std::string num_running_flushes = "num-running-flushes"; const std::string DB::Properties::kNumFilesAtLevelPrefix = rocksdb_prefix + num_files_at_level_prefix; @@ -143,6 +145,10 @@ const std::string DB::Properties::kMemTableFlushPending = rocksdb_prefix + mem_table_flush_pending; const std::string DB::Properties::kCompactionPending = rocksdb_prefix + compaction_pending; +const std::string DB::Properties::kNumRunningCompactions = + rocksdb_prefix + num_running_compactions; +const std::string DB::Properties::kNumRunningFlushes = + rocksdb_prefix + num_running_flushes; const std::string DB::Properties::kBackgroundErrors = rocksdb_prefix + background_errors; const std::string DB::Properties::kCurSizeActiveMemTable = @@ -260,6 +266,10 @@ DBPropertyType GetPropertyType(const Slice& property, bool* is_int_property, return kTotalSstFilesSize; } else if (in == estimate_pending_comp_bytes) { return kEstimatePendingCompactionBytes; + } else if (in == num_running_flushes) { + return kNumRunningFlushes; + } else if (in == num_running_compactions) { + return kNumRunningCompactions; } return kUnknown; } @@ -388,11 +398,17 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type, // Return number of mem tables that are ready to flush (made immutable) *value = (cfd_->imm()->IsFlushPending() ? 1 : 0); return true; + case kNumRunningFlushes: + *value = db->num_running_flushes(); + return true; case kCompactionPending: // 1 if the system already determines at least one compaction is needed. // 0 otherwise, *value = (cfd_->compaction_picker()->NeedsCompaction(vstorage) ? 1 : 0); return true; + case kNumRunningCompactions: + *value = db->num_running_compactions_; + return true; case kBackgroundErrors: // Accumulated number of errors in background flushes or compactions. *value = GetBackgroundErrorCount(); diff --git a/db/internal_stats.h b/db/internal_stats.h index 217394235..0e3413bc7 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -38,7 +38,10 @@ enum DBPropertyType : uint32_t { // in memory that have already been flushed kMemtableFlushPending, // Return 1 if mem table flushing is pending, // otherwise 0. + kNumRunningFlushes, // Return the number of currently running flushes. kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0. + kNumRunningCompactions, // Return the number of currently running + // compactions. kBackgroundErrors, // Return accumulated background errors encountered. kCurSizeActiveMemTable, // Return current size of the active memtable kCurSizeAllMemTables, // Return current size of unflushed diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 5a49638bd..0ded47eb4 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -324,34 +324,37 @@ class DB { // "rocksdb.compaction-pending" - 1 if at least one compaction is pending // "rocksdb.background-errors" - accumulated number of background errors // "rocksdb.cur-size-active-mem-table" -// "rocksdb.size-all-mem-tables" -// "rocksdb.num-entries-active-mem-table" -// "rocksdb.num-entries-imm-mem-tables" -// "rocksdb.num-deletes-active-mem-table" -// "rocksdb.num-deletes-imm-mem-tables" -// "rocksdb.estimate-num-keys" - estimated keys in the column family -// "rocksdb.estimate-table-readers-mem" - estimated memory used for reding -// SST tables, that is not counted as a part of block cache. -// "rocksdb.is-file-deletions-enabled" -// "rocksdb.num-snapshots" -// "rocksdb.oldest-snapshot-time" -// "rocksdb.num-live-versions" - `version` is an internal data structure. -// See version_set.h for details. More live versions often mean more SST -// files are held from being deleted, by iterators or unfinished -// compactions. -// "rocksdb.estimate-live-data-size" -// "rocksdb.total-sst-files-size" - total size of all used sst files, this may -// slow down online queries if there are too many files. -// "rocksdb.base-level" -// "rocksdb.estimate-pending-compaction-bytes" - estimated total number of -// bytes compaction needs to rewrite the data to get all levels down -// to under target size. Not valid for other compactions than level-based. -// "rocksdb.aggregated-table-properties" - returns a string representation of -// the aggregated table properties of the target column family. -// "rocksdb.aggregated-table-properties-at-level", same as the previous -// one but only returns the aggregated table properties of the specified -// level "N" at the target column family. -// replaced by the target level. + // "rocksdb.size-all-mem-tables" + // "rocksdb.num-entries-active-mem-table" + // "rocksdb.num-entries-imm-mem-tables" + // "rocksdb.num-deletes-active-mem-table" + // "rocksdb.num-deletes-imm-mem-tables" + // "rocksdb.estimate-num-keys" - estimated keys in the column family + // "rocksdb.estimate-table-readers-mem" - estimated memory used for reding + // SST tables, that is not counted as a part of block cache. + // "rocksdb.is-file-deletions-enabled" + // "rocksdb.num-snapshots" + // "rocksdb.oldest-snapshot-time" + // "rocksdb.num-live-versions" - `version` is an internal data structure. + // See version_set.h for details. More live versions often mean more SST + // files are held from being deleted, by iterators or unfinished + // compactions. + // "rocksdb.estimate-live-data-size" + // "rocksdb.total-sst-files-size" - total size of all used sst files, this + // may slow down online queries if there are too many files. + // "rocksdb.base-level" + // "rocksdb.estimate-pending-compaction-bytes" - estimated total number of + // bytes compaction needs to rewrite the data to get all levels down + // to under target size. Not valid for other compactions than + // level-based. + // "rocksdb.aggregated-table-properties" - returns a string representation + // of the aggregated table properties of the target column family. + // "rocksdb.aggregated-table-properties-at-level", same as the previous + // one but only returns the aggregated table properties of the specified + // level "N" at the target column family. + // "rocksdb.num-running-compactions" - the number of currently running + // compacitons. + // "rocksdb.num-running-flushes" - the number of currently running flushes. #ifndef ROCKSDB_LITE struct Properties { static const std::string kNumFilesAtLevelPrefix; @@ -361,7 +364,9 @@ class DB { static const std::string kDBStats; static const std::string kNumImmutableMemTable; static const std::string kMemTableFlushPending; + static const std::string kNumRunningFlushes; static const std::string kCompactionPending; + static const std::string kNumRunningCompactions; static const std::string kBackgroundErrors; static const std::string kCurSizeActiveMemTable; static const std::string kCurSizeAllMemTables; @@ -414,6 +419,8 @@ class DB { // "rocksdb.total-sst-files-size" // "rocksdb.base-level" // "rocksdb.estimate-pending-compaction-bytes" + // "rocksdb.num-running-compactions" + // "rocksdb.num-running-flushes" virtual bool GetIntProperty(ColumnFamilyHandle* column_family, const Slice& property, uint64_t* value) = 0; virtual bool GetIntProperty(const Slice& property, uint64_t* value) {