Improve visibility into the reasons for compaction.

Summary:
Add `compaction_reason` as part of event log for event `compaction started`.
Add counters for each `CompactionReason`.
Closes https://github.com/facebook/rocksdb/pull/3679

Differential Revision: D7550348

Pulled By: riversand963

fbshipit-source-id: a19cff3a678c785aa5ef41aac78b9a5968fcc34d
main
Yanqin Jin 7 years ago committed by Facebook Github Bot
parent 019d7894eb
commit d42bd041c5
  1. 47
      db/compaction_job.cc
  2. 98
      db/db_compaction_test.cc
  3. 2
      db/db_impl_open.cc
  4. 2
      db/external_sst_file_ingestion_job.cc
  5. 2
      db/flush_job.cc
  6. 4
      db/internal_stats.cc
  7. 66
      db/internal_stats.h
  8. 2
      db/obsolete_files_test.cc
  9. 12
      include/rocksdb/listener.h

@ -62,6 +62,46 @@
namespace rocksdb { namespace rocksdb {
const char* GetCompactionReasonString(CompactionReason compaction_reason) {
switch (compaction_reason) {
case CompactionReason::kUnknown:
return "Unknown";
case CompactionReason::kLevelL0FilesNum:
return "LevelL0FilesNum";
case CompactionReason::kLevelMaxLevelSize:
return "LevelMaxLevelSize";
case CompactionReason::kUniversalSizeAmplification:
return "UniversalSizeAmplification";
case CompactionReason::kUniversalSizeRatio:
return "UniversalSizeRatio";
case CompactionReason::kUniversalSortedRunNum:
return "UniversalSortedRunNum";
case CompactionReason::kFIFOMaxSize:
return "FIFOMaxSize";
case CompactionReason::kFIFOReduceNumFiles:
return "FIFOReduceNumFiles";
case CompactionReason::kFIFOTtl:
return "FIFOTtl";
case CompactionReason::kManualCompaction:
return "ManualCompaction";
case CompactionReason::kFilesMarkedForCompaction:
return "FilesMarkedForCompaction";
case CompactionReason::kBottommostFiles:
return "BottommostFiles";
case CompactionReason::kTtl:
return "Ttl";
case CompactionReason::kFlush:
return "Flush";
case CompactionReason::kExternalSstIngestion:
return "ExternalSstIngestion";
case CompactionReason::kNumOfReasons:
// fall through
default:
assert(false);
return "Invalid";
}
}
// Maintains state for each sub-compaction // Maintains state for each sub-compaction
struct CompactionJob::SubcompactionState { struct CompactionJob::SubcompactionState {
const Compaction* compaction; const Compaction* compaction;
@ -276,7 +316,7 @@ CompactionJob::CompactionJob(
: job_id_(job_id), : job_id_(job_id),
compact_(new CompactionState(compaction)), compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats), compaction_job_stats_(compaction_job_stats),
compaction_stats_(1), compaction_stats_(compaction->compaction_reason(), 1),
dbname_(dbname), dbname_(dbname),
db_options_(db_options), db_options_(db_options),
env_options_(env_options), env_options_(env_options),
@ -1498,8 +1538,9 @@ void CompactionJob::LogCompaction() {
cfd->GetName().c_str(), scratch); cfd->GetName().c_str(), scratch);
// build event logger report // build event logger report
auto stream = event_logger_->Log(); auto stream = event_logger_->Log();
stream << "job" << job_id_ << "event" stream << "job" << job_id_ << "event" << "compaction_started"
<< "compaction_started"; << "compaction_reason"
<< GetCompactionReasonString(compaction->compaction_reason());
for (size_t i = 0; i < compaction->num_input_levels(); ++i) { for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
stream << ("files_L" + ToString(compaction->level(i))); stream << ("files_L" + ToString(compaction->level(i)));
stream.StartArray(); stream.StartArray();

@ -74,6 +74,48 @@ class FlushedFileCollector : public EventListener {
std::mutex mutex_; std::mutex mutex_;
}; };
class CompactionStatsCollector : public EventListener {
public:
CompactionStatsCollector()
: compaction_completed_(static_cast<int>(CompactionReason::kNumOfReasons)) {
for (auto& v : compaction_completed_) {
v.store(0);
}
}
~CompactionStatsCollector() {}
virtual void OnCompactionCompleted(DB* /* db */,
const CompactionJobInfo& info) override {
int k = static_cast<int>(info.compaction_reason);
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
assert(k >= 0 && k < num_of_reasons);
compaction_completed_[k]++;
}
virtual void OnExternalFileIngested(DB* /* db */,
const ExternalFileIngestionInfo& /* info */) override {
int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
compaction_completed_[k]++;
}
virtual void OnFlushCompleted(DB* /* db */,
const FlushJobInfo& /* info */) override {
int k = static_cast<int>(CompactionReason::kFlush);
compaction_completed_[k]++;
}
int NumberOfCompactions(CompactionReason reason) const {
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
int k = static_cast<int>(reason);
assert(k >= 0 && k < num_of_reasons);
return compaction_completed_.at(k).load();
}
private:
std::vector<std::atomic<int>> compaction_completed_;
};
static const int kCDTValueSize = 1000; static const int kCDTValueSize = 1000;
static const int kCDTKeysPerBuffer = 4; static const int kCDTKeysPerBuffer = 4;
static const int kCDTNumLevels = 8; static const int kCDTNumLevels = 8;
@ -154,6 +196,40 @@ void VerifyCompactionResult(
#endif #endif
} }
/*
* Verifies compaction stats of cfd are valid.
*
* For each level of cfd, its compaction stats are valid if
* 1) sum(stat.counts) == stat.count, and
* 2) stat.counts[i] == collector.NumberOfCompactions(i)
*/
void VerifyCompactionStats(ColumnFamilyData& cfd,
const CompactionStatsCollector& collector) {
#ifndef NDEBUG
InternalStats* internal_stats_ptr = cfd.internal_stats();
ASSERT_TRUE(internal_stats_ptr != nullptr);
const std::vector<InternalStats::CompactionStats>& comp_stats =
internal_stats_ptr->TEST_GetCompactionStats();
const int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
std::vector<int> counts(num_of_reasons, 0);
// Count the number of compactions caused by each CompactionReason across
// all levels.
for (const auto& stat : comp_stats) {
int sum = 0;
for (int i = 0; i < num_of_reasons; i++) {
counts[i] += stat.counts[i];
sum += stat.counts[i];
}
ASSERT_EQ(sum, stat.count);
}
// Verify InternalStats bookkeeping matches that of CompactionStatsCollector,
// assuming that all compactions complete.
for (int i = 0; i < num_of_reasons; i++) {
ASSERT_EQ(collector.NumberOfCompactions(static_cast<CompactionReason>(i)), counts[i]);
}
#endif /* NDEBUG */
}
const SstFileMetaData* PickFileRandomly( const SstFileMetaData* PickFileRandomly(
const ColumnFamilyMetaData& cf_meta, const ColumnFamilyMetaData& cf_meta,
Random* rand, Random* rand,
@ -3562,6 +3638,28 @@ TEST_F(DBCompactionTest, CompactRangeFlushOverlappingMemtable) {
} }
} }
TEST_F(DBCompactionTest, CompactionStatsTest) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 2;
CompactionStatsCollector* collector = new CompactionStatsCollector();
options.listeners.emplace_back(collector);
DestroyAndReopen(options);
for (int i = 0; i < 32; i++) {
for (int j = 0; j < 5000; j++) {
Put(std::to_string(j), std::string(1, 'A'));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
dbfull()->TEST_WaitForCompact();
ColumnFamilyHandleImpl* cfh =
static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
ColumnFamilyData* cfd = cfh->cfd();
VerifyCompactionStats(*cfd, *collector);
}
INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam, INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
::testing::Values(std::make_tuple(1, true), ::testing::Values(std::make_tuple(1, true),
std::make_tuple(1, false), std::make_tuple(1, false),

@ -972,7 +972,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
meta.marked_for_compaction); meta.marked_for_compaction);
} }
InternalStats::CompactionStats stats(1); InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
stats.micros = env_->NowMicros() - start_micros; stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.fd.GetFileSize(); stats.bytes_written = meta.fd.GetFileSize();
stats.num_output_files = 1; stats.num_output_files = 1;

@ -218,7 +218,7 @@ void ExternalSstFileIngestionJob::UpdateStats() {
uint64_t total_l0_files = 0; uint64_t total_l0_files = 0;
uint64_t total_time = env_->NowMicros() - job_start_time_; uint64_t total_time = env_->NowMicros() - job_start_time_;
for (IngestedFileInfo& f : files_to_ingest_) { for (IngestedFileInfo& f : files_to_ingest_) {
InternalStats::CompactionStats stats(1); InternalStats::CompactionStats stats(CompactionReason::kExternalSstIngestion, 1);
stats.micros = total_time; stats.micros = total_time;
stats.bytes_written = f.fd.GetFileSize(); stats.bytes_written = f.fd.GetFileSize();
stats.num_output_files = 1; stats.num_output_files = 1;

@ -392,7 +392,7 @@ Status FlushJob::WriteLevel0Table() {
} }
// Note that here we treat flush as level 0 compaction in internal stats // Note that here we treat flush as level 0 compaction in internal stats
InternalStats::CompactionStats stats(1); InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
stats.micros = db_options_.env->NowMicros() - start_micros; stats.micros = db_options_.env->NowMicros() - start_micros;
stats.bytes_written = meta_.fd.GetFileSize(); stats.bytes_written = meta_.fd.GetFileSize();
MeasureTime(stats_, FLUSH_TIME, stats.micros); MeasureTime(stats_, FLUSH_TIME, stats.micros);

@ -957,7 +957,7 @@ void InternalStats::DumpDBStats(std::string* value) {
*/ */
void InternalStats::DumpCFMapStats( void InternalStats::DumpCFMapStats(
std::map<std::string, std::string>* cf_stats) { std::map<std::string, std::string>* cf_stats) {
CompactionStats compaction_stats_sum(0); CompactionStats compaction_stats_sum;
std::map<int, std::map<LevelStatType, double>> levels_stats; std::map<int, std::map<LevelStatType, double>> levels_stats;
DumpCFMapStats(&levels_stats, &compaction_stats_sum); DumpCFMapStats(&levels_stats, &compaction_stats_sum);
for (auto const& level_ent : levels_stats) { for (auto const& level_ent : levels_stats) {
@ -1088,7 +1088,7 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
// Print stats for each level // Print stats for each level
std::map<int, std::map<LevelStatType, double>> levels_stats; std::map<int, std::map<LevelStatType, double>> levels_stats;
CompactionStats compaction_stats_sum(0); CompactionStats compaction_stats_sum;
DumpCFMapStats(&levels_stats, &compaction_stats_sum); DumpCFMapStats(&levels_stats, &compaction_stats_sum);
for (int l = 0; l < number_levels_; ++l) { for (int l = 0; l < number_levels_; ++l) {
if (levels_stats.find(l) != levels_stats.end()) { if (levels_stats.find(l) != levels_stats.end()) {

@ -163,7 +163,28 @@ class InternalStats {
// Number of compactions done // Number of compactions done
int count; int count;
explicit CompactionStats(int _count = 0) // Number of compactions done per CompactionReason
int counts[static_cast<int>(CompactionReason::kNumOfReasons)];
explicit CompactionStats()
: micros(0),
bytes_read_non_output_levels(0),
bytes_read_output_level(0),
bytes_written(0),
bytes_moved(0),
num_input_files_in_non_output_levels(0),
num_input_files_in_output_level(0),
num_output_files(0),
num_input_records(0),
num_dropped_records(0),
count(0) {
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
for (int i = 0; i < num_of_reasons; i++) {
counts[i] = 0;
}
}
explicit CompactionStats(CompactionReason reason, int c)
: micros(0), : micros(0),
bytes_read_non_output_levels(0), bytes_read_non_output_levels(0),
bytes_read_output_level(0), bytes_read_output_level(0),
@ -174,7 +195,18 @@ class InternalStats {
num_output_files(0), num_output_files(0),
num_input_records(0), num_input_records(0),
num_dropped_records(0), num_dropped_records(0),
count(_count) {} count(c) {
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
for (int i = 0; i < num_of_reasons; i++) {
counts[i] = 0;
}
int r = static_cast<int>(reason);
if (r >= 0 && r < num_of_reasons) {
counts[r] = c;
} else {
count = 0;
}
}
explicit CompactionStats(const CompactionStats& c) explicit CompactionStats(const CompactionStats& c)
: micros(c.micros), : micros(c.micros),
@ -189,7 +221,12 @@ class InternalStats {
num_output_files(c.num_output_files), num_output_files(c.num_output_files),
num_input_records(c.num_input_records), num_input_records(c.num_input_records),
num_dropped_records(c.num_dropped_records), num_dropped_records(c.num_dropped_records),
count(c.count) {} count(c.count) {
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
for (int i = 0; i < num_of_reasons; i++) {
counts[i] = c.counts[i];
}
}
void Clear() { void Clear() {
this->micros = 0; this->micros = 0;
@ -203,6 +240,10 @@ class InternalStats {
this->num_input_records = 0; this->num_input_records = 0;
this->num_dropped_records = 0; this->num_dropped_records = 0;
this->count = 0; this->count = 0;
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
for (int i = 0; i < num_of_reasons; i++) {
counts[i] = 0;
}
} }
void Add(const CompactionStats& c) { void Add(const CompactionStats& c) {
@ -219,6 +260,10 @@ class InternalStats {
this->num_input_records += c.num_input_records; this->num_input_records += c.num_input_records;
this->num_dropped_records += c.num_dropped_records; this->num_dropped_records += c.num_dropped_records;
this->count += c.count; this->count += c.count;
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
for (int i = 0; i< num_of_reasons; i++) {
counts[i] += c.counts[i];
}
} }
void Subtract(const CompactionStats& c) { void Subtract(const CompactionStats& c) {
@ -235,6 +280,10 @@ class InternalStats {
this->num_input_records -= c.num_input_records; this->num_input_records -= c.num_input_records;
this->num_dropped_records -= c.num_dropped_records; this->num_dropped_records -= c.num_dropped_records;
this->count -= c.count; this->count -= c.count;
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
for (int i = 0; i < num_of_reasons; i++) {
counts[i] -= c.counts[i];
}
} }
}; };
@ -307,6 +356,10 @@ class InternalStats {
bool GetIntPropertyOutOfMutex(const DBPropertyInfo& property_info, bool GetIntPropertyOutOfMutex(const DBPropertyInfo& property_info,
Version* version, uint64_t* value); Version* version, uint64_t* value);
const std::vector<CompactionStats>& TEST_GetCompactionStats() const {
return comp_stats_;
}
// Store a mapping from the user-facing DB::Properties string to our // Store a mapping from the user-facing DB::Properties string to our
// DBPropertyInfo struct used internally for retrieving properties. // DBPropertyInfo struct used internally for retrieving properties.
static const std::unordered_map<std::string, DBPropertyInfo> ppt_name_to_info; static const std::unordered_map<std::string, DBPropertyInfo> ppt_name_to_info;
@ -350,8 +403,7 @@ class InternalStats {
uint64_t ingest_keys_addfile; // Total number of keys ingested uint64_t ingest_keys_addfile; // Total number of keys ingested
CFStatsSnapshot() CFStatsSnapshot()
: comp_stats(0), : ingest_bytes_flush(0),
ingest_bytes_flush(0),
stall_count(0), stall_count(0),
compact_bytes_write(0), compact_bytes_write(0),
compact_bytes_read(0), compact_bytes_read(0),
@ -543,7 +595,9 @@ class InternalStats {
uint64_t num_dropped_records; uint64_t num_dropped_records;
int count; int count;
explicit CompactionStats(int _count = 0) {} explicit CompactionStats() {}
explicit CompactionStats(CompactionReason reason, int c) {}
explicit CompactionStats(const CompactionStats& c) {} explicit CompactionStats(const CompactionStats& c) {}

@ -53,7 +53,7 @@ class ObsoleteFilesTest : public testing::Test {
options_.max_bytes_for_level_base = 1024*1024*1000; options_.max_bytes_for_level_base = 1024*1024*1000;
options_.WAL_ttl_seconds = 300; // Used to test log files options_.WAL_ttl_seconds = 300; // Used to test log files
options_.WAL_size_limit_MB = 1024; // Used to test log files options_.WAL_size_limit_MB = 1024; // Used to test log files
dbname_ = test::TmpDir() + "/double_deletefile_test"; dbname_ = test::TmpDir() + "/obsolete_files_test";
options_.wal_dir = dbname_ + "/wal_files"; options_.wal_dir = dbname_ + "/wal_files";
// clean up all the files that might have been there before // clean up all the files that might have been there before

@ -55,8 +55,8 @@ struct TableFileCreationInfo : public TableFileCreationBriefInfo {
Status status; Status status;
}; };
enum class CompactionReason { enum class CompactionReason : int {
kUnknown, kUnknown = 0,
// [Level] number of L0 files > level0_file_num_compaction_trigger // [Level] number of L0 files > level0_file_num_compaction_trigger
kLevelL0FilesNum, kLevelL0FilesNum,
// [Level] total size of level > MaxBytesForLevel() // [Level] total size of level > MaxBytesForLevel()
@ -80,7 +80,15 @@ enum class CompactionReason {
// [Level] Automatic compaction within bottommost level to cleanup duplicate // [Level] Automatic compaction within bottommost level to cleanup duplicate
// versions of same user key, usually due to a released snapshot. // versions of same user key, usually due to a released snapshot.
kBottommostFiles, kBottommostFiles,
// Compaction based on TTL
kTtl, kTtl,
// According to the comments in flush_job.cc, RocksDB treats flush as
// a level 0 compaction in internal stats.
kFlush,
// Compaction caused by external sst file ingestion
kExternalSstIngestion,
// total number of compaction reasons, new reasons must be added above this.
kNumOfReasons,
}; };
enum class FlushReason : int { enum class FlushReason : int {

Loading…
Cancel
Save