Allow GetProperty to report the number of currently running flushes / compactions.

Summary:
Add rocksdb.num-running-compactions and rocksdb.num-running-flushes
to GetIntProperty() that reports the number of currently running
compactions / flushes.

Test Plan: augmented existing tests in db_test

Reviewers: igor, IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D48693
main
Yueh-Hsuan Chiang 9 years ago
parent 277dea78f0
commit ad471453e8
  1. 8
      db/db_impl.cc
  2. 20
      db/db_impl.h
  3. 14
      db/db_test.cc
  4. 16
      db/internal_stats.cc
  5. 3
      db/internal_stats.h
  6. 19
      include/rocksdb/db.h

@ -245,8 +245,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
unscheduled_flushes_(0), unscheduled_flushes_(0),
unscheduled_compactions_(0), unscheduled_compactions_(0),
bg_compaction_scheduled_(0), bg_compaction_scheduled_(0),
num_running_compactions_(0),
bg_manual_only_(0), bg_manual_only_(0),
bg_flush_scheduled_(0), bg_flush_scheduled_(0),
num_running_flushes_(0),
manual_compaction_(nullptr), manual_compaction_(nullptr),
disable_delete_obsolete_files_(0), disable_delete_obsolete_files_(0),
delete_obsolete_files_next_run_( delete_obsolete_files_next_run_(
@ -2404,6 +2406,7 @@ void DBImpl::BackgroundCallFlush() {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
num_running_flushes_++;
auto pending_outputs_inserted_elem = auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs(); CaptureCurrentFileNumberInPendingOutputs();
@ -2449,6 +2452,8 @@ void DBImpl::BackgroundCallFlush() {
mutex_.Lock(); mutex_.Lock();
} }
assert(num_running_flushes_ > 0);
num_running_flushes_--;
bg_flush_scheduled_--; bg_flush_scheduled_--;
// See if there's more work to be done // See if there's more work to be done
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();
@ -2469,6 +2474,7 @@ void DBImpl::BackgroundCallCompaction() {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
num_running_compactions_++;
auto pending_outputs_inserted_elem = auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs(); CaptureCurrentFileNumberInPendingOutputs();
@ -2517,6 +2523,8 @@ void DBImpl::BackgroundCallCompaction() {
mutex_.Lock(); mutex_.Lock();
} }
assert(num_running_compactions_ > 0);
num_running_compactions_--;
bg_compaction_scheduled_--; bg_compaction_scheduled_--;
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();

@ -363,6 +363,20 @@ class DBImpl : public DB {
// Same as above, should called without mutex held and not on write thread. // Same as above, should called without mutex held and not on write thread.
ColumnFamilyHandle* GetColumnFamilyHandleUnlocked(uint32_t column_family_id); ColumnFamilyHandle* GetColumnFamilyHandleUnlocked(uint32_t column_family_id);
// Returns the number of currently running flushes.
// REQUIREMENT: mutex_ must be held when calling this function.
int num_running_flushes() {
mutex_.AssertHeld();
return num_running_flushes_;
}
// Returns the number of currently running compactions.
// REQUIREMENT: mutex_ must be held when calling this function.
int num_running_compactions() {
mutex_.AssertHeld();
return num_running_compactions_;
}
protected: protected:
Env* const env_; Env* const env_;
const std::string dbname_; const std::string dbname_;
@ -687,6 +701,9 @@ class DBImpl : public DB {
// count how many background compactions are running or have been scheduled // count how many background compactions are running or have been scheduled
int bg_compaction_scheduled_; int bg_compaction_scheduled_;
// stores the number of compactions are currently running
int num_running_compactions_;
// If non-zero, MaybeScheduleFlushOrCompaction() will only schedule manual // If non-zero, MaybeScheduleFlushOrCompaction() will only schedule manual
// compactions (if manual_compaction_ is not null). This mechanism enables // compactions (if manual_compaction_ is not null). This mechanism enables
// manual compactions to wait until all other compactions are finished. // manual compactions to wait until all other compactions are finished.
@ -695,6 +712,9 @@ class DBImpl : public DB {
// number of background memtable flush jobs, submitted to the HIGH pool // number of background memtable flush jobs, submitted to the HIGH pool
int bg_flush_scheduled_; int bg_flush_scheduled_;
// stores the number of flushes are currently running
int num_running_flushes_;
// Information for a manual compaction // Information for a manual compaction
struct ManualCompaction { struct ManualCompaction {
ColumnFamilyData* cfd; ColumnFamilyData* cfd;

@ -6941,6 +6941,10 @@ TEST_F(DBTest, ThreadStatusFlush) {
ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("v1", Get(1, "foo"));
VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 0); VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 0);
uint64_t num_running_flushes = 0;
db_->GetIntProperty(DB::Properties::kNumRunningFlushes, &num_running_flushes);
ASSERT_EQ(num_running_flushes, 0);
Put(1, "k1", std::string(100000, 'x')); // Fill memtable Put(1, "k1", std::string(100000, 'x')); // Fill memtable
Put(1, "k2", std::string(100000, 'y')); // Trigger flush Put(1, "k2", std::string(100000, 'y')); // Trigger flush
@ -6948,10 +6952,11 @@ TEST_F(DBTest, ThreadStatusFlush) {
// running when we perform VerifyOperationCount(). // running when we perform VerifyOperationCount().
TEST_SYNC_POINT("DBTest::ThreadStatusFlush:1"); TEST_SYNC_POINT("DBTest::ThreadStatusFlush:1");
VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 1); VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 1);
db_->GetIntProperty(DB::Properties::kNumRunningFlushes, &num_running_flushes);
ASSERT_EQ(num_running_flushes, 1);
// This second sync point is to ensure the flush job will not // This second sync point is to ensure the flush job will not
// be completed until we already perform VerifyOperationCount(). // be completed until we already perform VerifyOperationCount().
TEST_SYNC_POINT("DBTest::ThreadStatusFlush:2"); TEST_SYNC_POINT("DBTest::ThreadStatusFlush:2");
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
@ -6996,6 +7001,10 @@ TEST_P(DBTestWithParam, ThreadStatusSingleCompaction) {
} }
// This makes sure a compaction won't be scheduled until // This makes sure a compaction won't be scheduled until
// we have done with the above Put Phase. // we have done with the above Put Phase.
uint64_t num_running_compactions = 0;
db_->GetIntProperty(DB::Properties::kNumRunningCompactions,
&num_running_compactions);
ASSERT_EQ(num_running_compactions, 0);
TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:0"); TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:0");
ASSERT_GE(NumTableFilesAtLevel(0), ASSERT_GE(NumTableFilesAtLevel(0),
options.level0_file_num_compaction_trigger); options.level0_file_num_compaction_trigger);
@ -7010,6 +7019,9 @@ TEST_P(DBTestWithParam, ThreadStatusSingleCompaction) {
// If thread tracking is not enabled, compaction count should be 0. // If thread tracking is not enabled, compaction count should be 0.
VerifyOperationCount(env_, ThreadStatus::OP_COMPACTION, 0); VerifyOperationCount(env_, ThreadStatus::OP_COMPACTION, 0);
} }
db_->GetIntProperty(DB::Properties::kNumRunningCompactions,
&num_running_compactions);
ASSERT_EQ(num_running_compactions, 1);
// TODO(yhchiang): adding assert to verify each compaction stage. // TODO(yhchiang): adding assert to verify each compaction stage.
TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:2"); TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:2");

@ -130,6 +130,8 @@ static const std::string aggregated_table_properties =
"aggregated-table-properties"; "aggregated-table-properties";
static const std::string aggregated_table_properties_at_level = static const std::string aggregated_table_properties_at_level =
aggregated_table_properties + "-at-level"; aggregated_table_properties + "-at-level";
static const std::string num_running_compactions = "num-running-compactions";
static const std::string num_running_flushes = "num-running-flushes";
const std::string DB::Properties::kNumFilesAtLevelPrefix = const std::string DB::Properties::kNumFilesAtLevelPrefix =
rocksdb_prefix + num_files_at_level_prefix; rocksdb_prefix + num_files_at_level_prefix;
@ -143,6 +145,10 @@ const std::string DB::Properties::kMemTableFlushPending =
rocksdb_prefix + mem_table_flush_pending; rocksdb_prefix + mem_table_flush_pending;
const std::string DB::Properties::kCompactionPending = const std::string DB::Properties::kCompactionPending =
rocksdb_prefix + compaction_pending; rocksdb_prefix + compaction_pending;
const std::string DB::Properties::kNumRunningCompactions =
rocksdb_prefix + num_running_compactions;
const std::string DB::Properties::kNumRunningFlushes =
rocksdb_prefix + num_running_flushes;
const std::string DB::Properties::kBackgroundErrors = const std::string DB::Properties::kBackgroundErrors =
rocksdb_prefix + background_errors; rocksdb_prefix + background_errors;
const std::string DB::Properties::kCurSizeActiveMemTable = const std::string DB::Properties::kCurSizeActiveMemTable =
@ -260,6 +266,10 @@ DBPropertyType GetPropertyType(const Slice& property, bool* is_int_property,
return kTotalSstFilesSize; return kTotalSstFilesSize;
} else if (in == estimate_pending_comp_bytes) { } else if (in == estimate_pending_comp_bytes) {
return kEstimatePendingCompactionBytes; return kEstimatePendingCompactionBytes;
} else if (in == num_running_flushes) {
return kNumRunningFlushes;
} else if (in == num_running_compactions) {
return kNumRunningCompactions;
} }
return kUnknown; return kUnknown;
} }
@ -388,11 +398,17 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type,
// Return number of mem tables that are ready to flush (made immutable) // Return number of mem tables that are ready to flush (made immutable)
*value = (cfd_->imm()->IsFlushPending() ? 1 : 0); *value = (cfd_->imm()->IsFlushPending() ? 1 : 0);
return true; return true;
case kNumRunningFlushes:
*value = db->num_running_flushes();
return true;
case kCompactionPending: case kCompactionPending:
// 1 if the system already determines at least one compaction is needed. // 1 if the system already determines at least one compaction is needed.
// 0 otherwise, // 0 otherwise,
*value = (cfd_->compaction_picker()->NeedsCompaction(vstorage) ? 1 : 0); *value = (cfd_->compaction_picker()->NeedsCompaction(vstorage) ? 1 : 0);
return true; return true;
case kNumRunningCompactions:
*value = db->num_running_compactions_;
return true;
case kBackgroundErrors: case kBackgroundErrors:
// Accumulated number of errors in background flushes or compactions. // Accumulated number of errors in background flushes or compactions.
*value = GetBackgroundErrorCount(); *value = GetBackgroundErrorCount();

@ -38,7 +38,10 @@ enum DBPropertyType : uint32_t {
// in memory that have already been flushed // in memory that have already been flushed
kMemtableFlushPending, // Return 1 if mem table flushing is pending, kMemtableFlushPending, // Return 1 if mem table flushing is pending,
// otherwise 0. // otherwise 0.
kNumRunningFlushes, // Return the number of currently running flushes.
kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0. kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0.
kNumRunningCompactions, // Return the number of currently running
// compactions.
kBackgroundErrors, // Return accumulated background errors encountered. kBackgroundErrors, // Return accumulated background errors encountered.
kCurSizeActiveMemTable, // Return current size of the active memtable kCurSizeActiveMemTable, // Return current size of the active memtable
kCurSizeAllMemTables, // Return current size of unflushed kCurSizeAllMemTables, // Return current size of unflushed

@ -340,18 +340,21 @@ class DB {
// files are held from being deleted, by iterators or unfinished // files are held from being deleted, by iterators or unfinished
// compactions. // compactions.
// "rocksdb.estimate-live-data-size" // "rocksdb.estimate-live-data-size"
// "rocksdb.total-sst-files-size" - total size of all used sst files, this may // "rocksdb.total-sst-files-size" - total size of all used sst files, this
// slow down online queries if there are too many files. // may slow down online queries if there are too many files.
// "rocksdb.base-level" // "rocksdb.base-level"
// "rocksdb.estimate-pending-compaction-bytes" - estimated total number of // "rocksdb.estimate-pending-compaction-bytes" - estimated total number of
// bytes compaction needs to rewrite the data to get all levels down // bytes compaction needs to rewrite the data to get all levels down
// to under target size. Not valid for other compactions than level-based. // to under target size. Not valid for other compactions than
// "rocksdb.aggregated-table-properties" - returns a string representation of // level-based.
// the aggregated table properties of the target column family. // "rocksdb.aggregated-table-properties" - returns a string representation
// of the aggregated table properties of the target column family.
// "rocksdb.aggregated-table-properties-at-level<N>", same as the previous // "rocksdb.aggregated-table-properties-at-level<N>", same as the previous
// one but only returns the aggregated table properties of the specified // one but only returns the aggregated table properties of the specified
// level "N" at the target column family. // level "N" at the target column family.
// replaced by the target level. // "rocksdb.num-running-compactions" - the number of currently running
// compacitons.
// "rocksdb.num-running-flushes" - the number of currently running flushes.
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
struct Properties { struct Properties {
static const std::string kNumFilesAtLevelPrefix; static const std::string kNumFilesAtLevelPrefix;
@ -361,7 +364,9 @@ class DB {
static const std::string kDBStats; static const std::string kDBStats;
static const std::string kNumImmutableMemTable; static const std::string kNumImmutableMemTable;
static const std::string kMemTableFlushPending; static const std::string kMemTableFlushPending;
static const std::string kNumRunningFlushes;
static const std::string kCompactionPending; static const std::string kCompactionPending;
static const std::string kNumRunningCompactions;
static const std::string kBackgroundErrors; static const std::string kBackgroundErrors;
static const std::string kCurSizeActiveMemTable; static const std::string kCurSizeActiveMemTable;
static const std::string kCurSizeAllMemTables; static const std::string kCurSizeAllMemTables;
@ -414,6 +419,8 @@ class DB {
// "rocksdb.total-sst-files-size" // "rocksdb.total-sst-files-size"
// "rocksdb.base-level" // "rocksdb.base-level"
// "rocksdb.estimate-pending-compaction-bytes" // "rocksdb.estimate-pending-compaction-bytes"
// "rocksdb.num-running-compactions"
// "rocksdb.num-running-flushes"
virtual bool GetIntProperty(ColumnFamilyHandle* column_family, virtual bool GetIntProperty(ColumnFamilyHandle* column_family,
const Slice& property, uint64_t* value) = 0; const Slice& property, uint64_t* value) = 0;
virtual bool GetIntProperty(const Slice& property, uint64_t* value) { virtual bool GetIntProperty(const Slice& property, uint64_t* value) {

Loading…
Cancel
Save