From fe5c6321cb508505748fda01002a39b2bbe888a1 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 2 Jun 2015 17:07:16 -0700 Subject: [PATCH] Allow EventListener::OnCompactionCompleted to return CompactionJobStats. Summary: Allow EventListener::OnCompactionCompleted to return CompactionJobStats, which contains useful information about a compaction. Example CompactionJobStats returned by OnCompactionCompleted(): smallest_output_key_prefix 05000000 largest_output_key_prefix 06990000 elapsed_time 42419 num_input_records 300 num_input_files 3 num_input_files_at_output_level 2 num_output_records 200 num_output_files 1 actual_bytes_input 167200 actual_bytes_output 110688 total_input_raw_key_bytes 5400 total_input_raw_value_bytes 300000 num_records_replaced 100 is_manual_compaction 1 Test Plan: Developed a mega test in db_test which covers 20 variables in CompactionJobStats. Reviewers: rven, igor, anthony, sdong Reviewed By: sdong Subscribers: tnovak, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D38463 --- Makefile | 6 +- db/column_family.cc | 2 +- db/column_family.h | 7 +- db/compaction_job.cc | 122 ++++- db/compaction_job.h | 10 +- db/compaction_job_stats_test.cc | 691 +++++++++++++++++++++++++ db/compaction_job_test.cc | 49 +- db/db_impl.cc | 37 +- db/db_impl.h | 4 +- db/merge_helper.cc | 7 +- include/rocksdb/compaction_job_stats.h | 51 ++ include/rocksdb/iostats_context.h | 6 +- include/rocksdb/listener.h | 15 + src.mk | 2 + util/compaction_job_stats_impl.cc | 44 ++ util/thread_status_updater.cc | 9 + util/thread_status_updater.h | 2 + util/thread_status_util.cc | 14 + util/thread_status_util.h | 2 + 19 files changed, 1032 insertions(+), 48 deletions(-) create mode 100644 db/compaction_job_stats_test.cc create mode 100644 include/rocksdb/compaction_job_stats.h create mode 100644 util/compaction_job_stats_impl.cc diff --git a/Makefile b/Makefile index 8ff563f00..e7e62ca70 100644 --- a/Makefile +++ b/Makefile @@ -289,7 +289,8 @@ TESTS = \ compact_files_test \ perf_context_test \ optimistic_transaction_test \ - write_callback_test + write_callback_test \ + compaction_job_stats_test SUBSET := $(shell echo $(TESTS) |sed s/^.*$(ROCKSDBTESTS_START)/$(ROCKSDBTESTS_START)/) @@ -714,6 +715,9 @@ flush_job_test: db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS) compaction_job_test: db/compaction_job_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +compaction_job_stats_test: db/compaction_job_stats_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + wal_manager_test: db/wal_manager_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/column_family.cc b/db/column_family.cc index ecff2e008..298b63704 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -25,13 +25,13 @@ #include "db/version_set.h" #include "db/writebuffer.h" #include "db/internal_stats.h" -#include "db/job_context.h" #include "db/table_properties_collector.h" #include "db/version_set.h" #include "db/write_controller.h" #include "util/autovector.h" #include "util/hash_skiplist_rep.h" #include "util/options_helper.h" +#include "util/thread_status_util.h" #include "util/xfunc.h" namespace rocksdb { diff --git a/db/column_family.h b/db/column_family.h index 3fe374323..6c37d92fa 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -14,15 +14,16 @@ #include #include -#include "rocksdb/options.h" -#include "rocksdb/db.h" -#include "rocksdb/env.h" #include "db/memtable_list.h" #include "db/write_batch_internal.h" #include "db/write_controller.h" #include "db/table_cache.h" #include "db/table_properties_collector.h" #include "db/flush_scheduler.h" +#include "rocksdb/compaction_job_stats.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/options.h" #include "util/instrumented_mutex.h" #include "util/mutable_cf_options.h" #include "util/thread_local.h" diff --git a/db/compaction_job.cc b/db/compaction_job.cc index d79632fee..13ada27de 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -205,9 +205,11 @@ CompactionJob::CompactionJob( std::vector existing_snapshots, std::shared_ptr table_cache, std::function yield_callback, EventLogger* event_logger, - bool paranoid_file_checks, const std::string& dbname) + bool paranoid_file_checks, const std::string& dbname, + CompactionJobStats* compaction_job_stats) : job_id_(job_id), compact_(new CompactionState(compaction)), + compaction_job_stats_(compaction_job_stats), compaction_stats_(1), dbname_(dbname), db_options_(db_options), @@ -248,11 +250,15 @@ void CompactionJob::ReportStartedCompaction( (static_cast(compact_->compaction->start_level()) << 32) + compact_->compaction->output_level()); + // In the current design, a CompactionJob is always created + // for non-trivial compaction. + assert(compaction->IsTrivialMove() == false || + compaction->IsManualCompaction() == true); + ThreadStatusUtil::SetThreadOperationProperty( ThreadStatus::COMPACTION_PROP_FLAGS, - compaction->IsManualCompaction() + - (compaction->IsDeletionCompaction() << 1) + - (compaction->IsTrivialMove() << 2)); + compaction->IsManualCompaction() + + (compaction->IsDeletionCompaction() << 1)); ThreadStatusUtil::SetThreadOperationProperty( ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES, @@ -269,6 +275,11 @@ void CompactionJob::ReportStartedCompaction( // to ensure GetThreadList() can always show them all together. ThreadStatusUtil::SetThreadOperation( ThreadStatus::OP_COMPACTION); + + if (compaction_job_stats_) { + compaction_job_stats_->is_manual_compaction = + compaction->IsManualCompaction(); + } } void CompactionJob::Prepare() { @@ -575,6 +586,8 @@ void CompactionJob::Install(Status* status, status->ToString().c_str(), stats.num_input_records, stats.num_dropped_records); + UpdateCompactionJobStats(stats); + auto stream = event_logger_->LogToBuffer(log_buffer_); stream << "job" << job_id_ << "event" << "compaction_finished" @@ -636,19 +649,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, !cfd->IsDropped() && status.ok()) { compact_->num_input_records++; if (++loop_cnt > 1000) { - if (key_drop_user > 0) { - RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user); - key_drop_user = 0; - } - if (key_drop_newer_entry > 0) { - RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, - key_drop_newer_entry); - key_drop_newer_entry = 0; - } - if (key_drop_obsolete > 0) { - RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete); - key_drop_obsolete = 0; - } + RecordDroppedKeys( + &key_drop_user, &key_drop_newer_entry, &key_drop_obsolete); RecordCompactionIOStats(); loop_cnt = 0; } @@ -680,6 +682,13 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, ++combined_idx; } + if (compaction_job_stats_ != nullptr) { + compaction_job_stats_->total_input_raw_key_bytes += + input->key().size(); + compaction_job_stats_->total_input_raw_value_bytes += + input->value().size(); + } + if (compact_->compaction->ShouldStopBefore(key) && compact_->builder != nullptr) { status = FinishCompactionOutputFile(input); @@ -922,20 +931,33 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, } } RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time); - if (key_drop_user > 0) { - RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user); - } - if (key_drop_newer_entry > 0) { - RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, key_drop_newer_entry); - } - if (key_drop_obsolete > 0) { - RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete); - } + RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, &key_drop_obsolete); RecordCompactionIOStats(); return status; } +void CompactionJob::RecordDroppedKeys( + int64_t* key_drop_user, + int64_t* key_drop_newer_entry, + int64_t* key_drop_obsolete) { + if (*key_drop_user > 0) { + RecordTick(stats_, COMPACTION_KEY_DROP_USER, *key_drop_user); + *key_drop_user = 0; + } + if (*key_drop_newer_entry > 0) { + RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, *key_drop_newer_entry); + if (compaction_job_stats_) { + compaction_job_stats_->num_records_replaced += *key_drop_newer_entry; + } + *key_drop_newer_entry = 0; + } + if (*key_drop_obsolete > 0) { + RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, *key_drop_obsolete); + *key_drop_obsolete = 0; + } +} + void CompactionJob::CallCompactionFilterV2( CompactionFilterV2* compaction_filter_v2, uint64_t* time) { if (compact_ == nullptr || compaction_filter_v2 == nullptr) { @@ -1227,4 +1249,52 @@ void CompactionJob::CleanupCompaction(const Status& status) { compact_ = nullptr; } +#ifndef ROCKSDB_LITE +namespace { +void CopyPrefix( + char* dst, size_t dst_length, const Slice& src) { + assert(dst_length > 0); + size_t length = src.size() > dst_length - 1 ? dst_length - 1 : src.size(); + memcpy(dst, src.data(), length); + dst[length] = 0; +} +} // namespace + +#endif // !ROCKSDB_LITE + +void CompactionJob::UpdateCompactionJobStats( + const InternalStats::CompactionStats& stats) const { +#ifndef ROCKSDB_LITE + if (compaction_job_stats_) { + compaction_job_stats_->elapsed_micros = stats.micros; + + // input information + compaction_job_stats_->total_input_bytes = + stats.bytes_readn + stats.bytes_readnp1; + compaction_job_stats_->num_input_records = compact_->num_input_records; + compaction_job_stats_->num_input_files = + stats.files_in_leveln + stats.files_in_levelnp1; + compaction_job_stats_->num_input_files_at_output_level = + stats.files_in_levelnp1; + + // output information + compaction_job_stats_->total_output_bytes = stats.bytes_written; + compaction_job_stats_->num_output_records = + compact_->num_output_records; + compaction_job_stats_->num_output_files = stats.files_out_levelnp1; + + if (compact_->outputs.size() > 0U) { + CopyPrefix( + compaction_job_stats_->smallest_output_key_prefix, + sizeof(compaction_job_stats_->smallest_output_key_prefix), + compact_->outputs[0].smallest.user_key().ToString()); + CopyPrefix( + compaction_job_stats_->largest_output_key_prefix, + sizeof(compaction_job_stats_->largest_output_key_prefix), + compact_->current_output()->largest.user_key().ToString()); + } + } +#endif // !ROCKSDB_LITE +} + } // namespace rocksdb diff --git a/db/compaction_job.h b/db/compaction_job.h index 00e92f23f..5a285d4eb 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -27,6 +27,7 @@ #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" #include "rocksdb/compaction_filter.h" +#include "rocksdb/compaction_job_stats.h" #include "rocksdb/transaction_log.h" #include "util/autovector.h" #include "util/event_logger.h" @@ -59,7 +60,8 @@ class CompactionJob { std::shared_ptr table_cache, std::function yield_callback, EventLogger* event_logger, bool paranoid_file_checks, - const std::string& dbname); + const std::string& dbname, + CompactionJobStats* compaction_job_stats); ~CompactionJob(); @@ -97,12 +99,18 @@ class CompactionJob { void RecordCompactionIOStats(); Status OpenCompactionOutputFile(); void CleanupCompaction(const Status& status); + void UpdateCompactionJobStats( + const InternalStats::CompactionStats& stats) const; + void RecordDroppedKeys(int64_t* key_drop_user, + int64_t* key_drop_newer_entry, + int64_t* key_drop_obsolete); int job_id_; // CompactionJob state struct CompactionState; CompactionState* compact_; + CompactionJobStats* compaction_job_stats_; bool bottommost_level_; SequenceNumber earliest_snapshot_; diff --git a/db/compaction_job_stats_test.cc b/db/compaction_job_stats_test.cc new file mode 100644 index 000000000..acbaf49f0 --- /dev/null +++ b/db/compaction_job_stats_test.cc @@ -0,0 +1,691 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "db/db_impl.h" +#include "db/dbformat.h" +#include "db/filename.h" +#include "db/job_context.h" +#include "db/version_set.h" +#include "db/write_batch_internal.h" +#include "port/stack_trace.h" +#include "rocksdb/cache.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/experimental.h" +#include "rocksdb/filter_policy.h" +#include "rocksdb/options.h" +#include "rocksdb/perf_context.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/table.h" +#include "rocksdb/table_properties.h" +#include "rocksdb/thread_status.h" +#include "rocksdb/utilities/checkpoint.h" +#include "rocksdb/utilities/convenience.h" +#include "rocksdb/utilities/write_batch_with_index.h" +#include "table/block_based_table_factory.h" +#include "table/mock_table.h" +#include "table/plain_table_factory.h" +#include "util/compression.h" +#include "util/hash.h" +#include "util/hash_linklist_rep.h" +#include "util/logging.h" +#include "util/mock_env.h" +#include "util/mutexlock.h" +#include "util/rate_limiter.h" +#include "util/scoped_arena_iterator.h" +#include "util/statistics.h" +#include "util/string_util.h" +#include "util/sync_point.h" +#include "util/testharness.h" +#include "util/testutil.h" +#include "util/thread_status_util.h" +#include "util/xfunc.h" +#include "utilities/merge_operators.h" + +#if !defined(IOS_CROSS_COMPILE) +#ifndef ROCKSDB_LITE +namespace rocksdb { + +static std::string RandomString(Random* rnd, int len, double ratio) { + std::string r; + test::CompressibleString(rnd, ratio, len, &r); + return r; +} + +std::string Key(uint64_t key, int length) { + const int kBufSize = 1000; + char buf[kBufSize]; + if (length > kBufSize) { + length = kBufSize; + } + snprintf(buf, kBufSize, "%0*lu", length, key); + return std::string(buf); +} + +class CompactionJobStatsTest : public testing::Test { + public: + std::string dbname_; + std::string alternative_wal_dir_; + Env* env_; + DB* db_; + std::vector handles_; + + Options last_options_; + + CompactionJobStatsTest() : env_(Env::Default()) { + env_->SetBackgroundThreads(1, Env::LOW); + env_->SetBackgroundThreads(1, Env::HIGH); + dbname_ = test::TmpDir(env_) + "/compaction_job_stats_test"; + alternative_wal_dir_ = dbname_ + "/wal"; + Options options; + options.create_if_missing = true; + auto delete_options = options; + delete_options.wal_dir = alternative_wal_dir_; + EXPECT_OK(DestroyDB(dbname_, delete_options)); + // Destroy it for not alternative WAL dir is used. + EXPECT_OK(DestroyDB(dbname_, options)); + db_ = nullptr; + Reopen(options); + } + + ~CompactionJobStatsTest() { + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->LoadDependency({}); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + Close(); + Options options; + options.db_paths.emplace_back(dbname_, 0); + options.db_paths.emplace_back(dbname_ + "_2", 0); + options.db_paths.emplace_back(dbname_ + "_3", 0); + options.db_paths.emplace_back(dbname_ + "_4", 0); + EXPECT_OK(DestroyDB(dbname_, options)); + } + + DBImpl* dbfull() { + return reinterpret_cast(db_); + } + + void CreateColumnFamilies(const std::vector& cfs, + const Options& options) { + ColumnFamilyOptions cf_opts(options); + size_t cfi = handles_.size(); + handles_.resize(cfi + cfs.size()); + for (auto cf : cfs) { + ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++])); + } + } + + void CreateAndReopenWithCF(const std::vector& cfs, + const Options& options) { + CreateColumnFamilies(cfs, options); + std::vector cfs_plus_default = cfs; + cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName); + ReopenWithColumnFamilies(cfs_plus_default, options); + } + + void ReopenWithColumnFamilies(const std::vector& cfs, + const std::vector& options) { + ASSERT_OK(TryReopenWithColumnFamilies(cfs, options)); + } + + void ReopenWithColumnFamilies(const std::vector& cfs, + const Options& options) { + ASSERT_OK(TryReopenWithColumnFamilies(cfs, options)); + } + + Status TryReopenWithColumnFamilies( + const std::vector& cfs, + const std::vector& options) { + Close(); + EXPECT_EQ(cfs.size(), options.size()); + std::vector column_families; + for (size_t i = 0; i < cfs.size(); ++i) { + column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i])); + } + DBOptions db_opts = DBOptions(options[0]); + return DB::Open(db_opts, dbname_, column_families, &handles_, &db_); + } + + Status TryReopenWithColumnFamilies(const std::vector& cfs, + const Options& options) { + Close(); + std::vector v_opts(cfs.size(), options); + return TryReopenWithColumnFamilies(cfs, v_opts); + } + + void Reopen(const Options& options) { + ASSERT_OK(TryReopen(options)); + } + + void Close() { + for (auto h : handles_) { + delete h; + } + handles_.clear(); + delete db_; + db_ = nullptr; + } + + void DestroyAndReopen(const Options& options) { + // Destroy using last options + Destroy(last_options_); + ASSERT_OK(TryReopen(options)); + } + + void Destroy(const Options& options) { + Close(); + ASSERT_OK(DestroyDB(dbname_, options)); + } + + Status ReadOnlyReopen(const Options& options) { + return DB::OpenForReadOnly(options, dbname_, &db_); + } + + Status TryReopen(const Options& options) { + Close(); + last_options_ = options; + return DB::Open(options, dbname_, &db_); + } + + Status Flush(int cf = 0) { + if (cf == 0) { + return db_->Flush(FlushOptions()); + } else { + return db_->Flush(FlushOptions(), handles_[cf]); + } + } + + Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) { + return db_->Put(wo, k, v); + } + + Status Put(int cf, const Slice& k, const Slice& v, + WriteOptions wo = WriteOptions()) { + return db_->Put(wo, handles_[cf], k, v); + } + + Status Delete(const std::string& k) { + return db_->Delete(WriteOptions(), k); + } + + Status Delete(int cf, const std::string& k) { + return db_->Delete(WriteOptions(), handles_[cf], k); + } + + std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) { + ReadOptions options; + options.verify_checksums = true; + options.snapshot = snapshot; + std::string result; + Status s = db_->Get(options, k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } + + std::string Get(int cf, const std::string& k, + const Snapshot* snapshot = nullptr) { + ReadOptions options; + options.verify_checksums = true; + options.snapshot = snapshot; + std::string result; + Status s = db_->Get(options, handles_[cf], k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } + + int NumTableFilesAtLevel(int level, int cf = 0) { + std::string property; + if (cf == 0) { + // default cfd + EXPECT_TRUE(db_->GetProperty( + "rocksdb.num-files-at-level" + NumberToString(level), &property)); + } else { + EXPECT_TRUE(db_->GetProperty( + handles_[cf], "rocksdb.num-files-at-level" + NumberToString(level), + &property)); + } + return atoi(property.c_str()); + } + + // Return spread of files per level + std::string FilesPerLevel(int cf = 0) { + int num_levels = + (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]); + std::string result; + size_t last_non_zero_offset = 0; + for (int level = 0; level < num_levels; level++) { + int f = NumTableFilesAtLevel(level, cf); + char buf[100]; + snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f); + result += buf; + if (f > 0) { + last_non_zero_offset = result.size(); + } + } + result.resize(last_non_zero_offset); + return result; + } + + uint64_t Size(const Slice& start, const Slice& limit, int cf = 0) { + Range r(start, limit); + uint64_t size; + if (cf == 0) { + db_->GetApproximateSizes(&r, 1, &size); + } else { + db_->GetApproximateSizes(handles_[1], &r, 1, &size); + } + return size; + } + + void Compact(int cf, const Slice& start, const Slice& limit, + uint32_t target_path_id) { + ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit, false, -1, + target_path_id)); + } + + void Compact(int cf, const Slice& start, const Slice& limit) { + ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit)); + } + + void Compact(const Slice& start, const Slice& limit) { + ASSERT_OK(db_->CompactRange(&start, &limit)); + } + + // Do n memtable compactions, each of which produces an sstable + // covering the range [small,large]. + void MakeTables(int n, const std::string& small, const std::string& large, + int cf = 0) { + for (int i = 0; i < n; i++) { + ASSERT_OK(Put(cf, small, "begin")); + ASSERT_OK(Put(cf, large, "end")); + ASSERT_OK(Flush(cf)); + } + } + + void MakeTableWithKeyValues( + Random* rnd, uint64_t smallest, uint64_t largest, + int key_size, int value_size, uint64_t interval, + double ratio, int cf = 0) { + for (auto key = smallest; key < largest; key += interval) { + ASSERT_OK(Put(cf, Slice(Key(key, key_size)), + Slice(RandomString(rnd, value_size, ratio)))); + } + ASSERT_OK(Flush(cf)); + } +}; + +// An EventListener which helps verify the compaction results in +// test CompactionJobStatsTest. +class CompactionJobStatsChecker : public EventListener { + public: + CompactionJobStatsChecker() : compression_enabled_(false) {} + + size_t NumberOfUnverifiedStats() { return expected_stats_.size(); } + + // Once a compaction completed, this functionw will verify the returned + // CompactionJobInfo with the oldest CompactionJobInfo added earlier + // in "expected_stats_" which has not yet being used for verification. + virtual void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) { + std::lock_guard lock(mutex_); + + if (expected_stats_.size()) { + Verify(ci.stats, expected_stats_.front()); + expected_stats_.pop(); + } + } + + // A helper function which verifies whether two CompactionJobStats + // match. The verification of all compaction stats are done by + // ASSERT_EQ except the following stats, which we use ASSERT_GE + // and ASSERT_LE with a reasonable (< 15%) bias: + // 1. write-amplication + // 2. actual bytes input and output, which relies on the compression + // ratio and the implementation of table formats. + void Verify(const CompactionJobStats& current_stats, + const CompactionJobStats& stats) { + // time + ASSERT_GT(current_stats.elapsed_micros, 0U); + + ASSERT_EQ(current_stats.num_input_records, + stats.num_input_records); + ASSERT_EQ(current_stats.num_input_files, + stats.num_input_files); + ASSERT_EQ(current_stats.num_input_files_at_output_level, + stats.num_input_files_at_output_level); + + ASSERT_EQ(current_stats.num_output_records, + stats.num_output_records); + ASSERT_EQ(current_stats.num_output_files, + stats.num_output_files); + + ASSERT_EQ(current_stats.is_manual_compaction, + stats.is_manual_compaction); + + // file size + double kFileSizeBias = 0.15; + ASSERT_GE(current_stats.total_input_bytes * (1.00 + kFileSizeBias), + stats.total_input_bytes); + ASSERT_LE(current_stats.total_input_bytes, + stats.total_input_bytes * (1.00 + kFileSizeBias)); + ASSERT_GE(current_stats.total_output_bytes * (1.00 + kFileSizeBias), + stats.total_output_bytes); + ASSERT_LE(current_stats.total_output_bytes, + stats.total_output_bytes * (1.00 + kFileSizeBias)); + ASSERT_EQ(current_stats.total_input_raw_key_bytes, + stats.total_input_raw_key_bytes); + ASSERT_EQ(current_stats.total_input_raw_value_bytes, + stats.total_input_raw_value_bytes); + + ASSERT_EQ(current_stats.num_records_replaced, + stats.num_records_replaced); + + ASSERT_EQ( + std::string(current_stats.smallest_output_key_prefix), + std::string(stats.smallest_output_key_prefix)); + ASSERT_EQ( + std::string(current_stats.largest_output_key_prefix), + std::string(stats.largest_output_key_prefix)); + } + + // Add an expected compaction stats, which will be used to + // verify the CompactionJobStats returned by the OnCompactionCompleted() + // callback. + void AddExpectedStats(const CompactionJobStats& stats) { + std::lock_guard lock(mutex_); + expected_stats_.push(stats); + } + + void EnableCompression(bool flag) { + compression_enabled_ = flag; + } + + private: + std::mutex mutex_; + std::queue expected_stats_; + bool compression_enabled_; +}; + +namespace { + +uint64_t EstimatedFileSize( + uint64_t num_records, size_t key_size, size_t value_size, + double compression_ratio = 1.0, + size_t block_size = 4096, + int bloom_bits_per_key = 10) { + const size_t kPerKeyOverhead = 8; + const size_t kFooterSize = 512; + + uint64_t data_size = + num_records * (key_size + value_size * compression_ratio + + kPerKeyOverhead); + + return data_size + kFooterSize + + num_records * bloom_bits_per_key / 8 // filter block + + data_size * (key_size + 8) / block_size; // index block +} + +namespace { + +void CopyPrefix( + char* dst, size_t dst_length, const Slice& src) { + assert(dst_length > 0); + size_t length = src.size() > dst_length - 1 ? dst_length - 1 : src.size(); + memcpy(dst, src.data(), length); + dst[length] = 0; +} + +} // namespace + +CompactionJobStats NewManualCompactionJobStats( + const std::string& smallest_key, const std::string& largest_key, + size_t num_input_files, size_t num_input_files_at_output_level, + uint64_t num_input_records, size_t key_size, size_t value_size, + size_t num_output_files, uint64_t num_output_records, + double compression_ratio, uint64_t num_records_replaced) { + CompactionJobStats stats; + stats.Reset(); + + stats.num_input_records = num_input_records; + stats.num_input_files = num_input_files; + stats.num_input_files_at_output_level = num_input_files_at_output_level; + + stats.num_output_records = num_output_records; + stats.num_output_files = num_output_files; + + stats.total_input_bytes = + EstimatedFileSize( + num_input_records / num_input_files, + key_size, value_size, compression_ratio) * num_input_files; + stats.total_output_bytes = + EstimatedFileSize( + num_output_records / num_output_files, + key_size, value_size, compression_ratio) * num_output_files; + stats.total_input_raw_key_bytes = + num_input_records * (key_size + 8); + stats.total_input_raw_value_bytes = + num_input_records * value_size; + + stats.is_manual_compaction = true; + + stats.num_records_replaced = num_records_replaced; + + CopyPrefix(stats.smallest_output_key_prefix, + sizeof(stats.smallest_output_key_prefix), + smallest_key); + CopyPrefix(stats.largest_output_key_prefix, + sizeof(stats.largest_output_key_prefix), + largest_key); + + return stats; +} + +CompressionType GetAnyCompression() { + if (Snappy_Supported()) { + return kSnappyCompression; + } else if (Zlib_Supported()) { + return kZlibCompression; + } else if (BZip2_Supported()) { + return kBZip2Compression; + } else if (LZ4_Supported()) { + return kLZ4Compression; + } + return kNoCompression; +} + +} // namespace + +TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) { + Random rnd(301); + const int kBufSize = 100; + char buf[kBufSize]; + uint64_t key_base = 100000000l; + // Note: key_base must be multiple of num_keys_per_L0_file + int num_keys_per_L0_file = 100; + const int kTestScale = 8; + const int kKeySize = 10; + const int kValueSize = 1000; + const double kCompressionRatio = 0.5; + double compression_ratio = 1.0; + uint64_t key_interval = key_base / num_keys_per_L0_file; + + // Whenever a compaction completes, this listener will try to + // verify whether the returned CompactionJobStats matches + // what we expect. The expected CompactionJobStats is added + // via AddExpectedStats(). + auto* stats_checker = new CompactionJobStatsChecker(); + Options options; + options.listeners.emplace_back(stats_checker); + options.create_if_missing = true; + options.max_background_flushes = 0; + options.max_mem_compaction_level = 0; + // just enough setting to hold off auto-compaction. + options.level0_file_num_compaction_trigger = kTestScale + 1; + options.num_levels = 3; + options.compression = kNoCompression; + + for (int test = 0; test < 2; ++test) { + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + + // 1st Phase: generate "num_L0_files" L0 files. + int num_L0_files = 0; + for (uint64_t start_key = key_base; + start_key <= key_base * kTestScale; + start_key += key_base) { + MakeTableWithKeyValues( + &rnd, start_key, start_key + key_base - 1, + kKeySize, kValueSize, key_interval, + kCompressionRatio, 1); + snprintf(buf, kBufSize, "%d", ++num_L0_files); + ASSERT_EQ(std::string(buf), FilesPerLevel(1)); + } + ASSERT_EQ(std::to_string(num_L0_files), FilesPerLevel(1)); + + // 2nd Phase: perform L0 -> L1 compaction. + int L0_compaction_count = 6; + int count = 1; + std::string smallest_key; + std::string largest_key; + for (uint64_t start_key = key_base; + start_key <= key_base * L0_compaction_count; + start_key += key_base, count++) { + smallest_key = Key(start_key, 10); + largest_key = Key(start_key + key_base - key_interval, 10); + stats_checker->AddExpectedStats( + NewManualCompactionJobStats( + smallest_key, largest_key, + 1, 0, num_keys_per_L0_file, + kKeySize, kValueSize, + 1, num_keys_per_L0_file, + compression_ratio, 0)); + ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U); + Compact(1, smallest_key, largest_key); + snprintf(buf, kBufSize, "%d,%d", num_L0_files - count, count); + ASSERT_EQ(std::string(buf), FilesPerLevel(1)); + } + + // compact two files into one in the last L0 -> L1 compaction + int num_remaining_L0 = num_L0_files - L0_compaction_count; + smallest_key = Key(key_base * (L0_compaction_count + 1), 10); + largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10); + stats_checker->AddExpectedStats( + NewManualCompactionJobStats( + smallest_key, largest_key, + num_remaining_L0, + 0, num_keys_per_L0_file * num_remaining_L0, + kKeySize, kValueSize, + 1, num_keys_per_L0_file * num_remaining_L0, + compression_ratio, 0)); + ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U); + Compact(1, smallest_key, largest_key); + + int num_L1_files = num_L0_files - num_remaining_L0 + 1; + num_L0_files = 0; + snprintf(buf, kBufSize, "%d,%d", num_L0_files, num_L1_files); + ASSERT_EQ(std::string(buf), FilesPerLevel(1)); + + // 3rd Phase: generate sparse L0 files (wider key-range, same num of keys) + int sparseness = 2; + for (uint64_t start_key = key_base; + start_key <= key_base * kTestScale; + start_key += key_base * sparseness) { + MakeTableWithKeyValues( + &rnd, start_key, start_key + key_base * sparseness - 1, + kKeySize, kValueSize, + key_base * sparseness / num_keys_per_L0_file, + kCompressionRatio, 1); + snprintf(buf, kBufSize, "%d,%d", ++num_L0_files, num_L1_files); + ASSERT_EQ(std::string(buf), FilesPerLevel(1)); + } + + // 4th Phase: perform L0 -> L1 compaction again, expect higher write amp + for (uint64_t start_key = key_base; + num_L0_files > 1; + start_key += key_base * sparseness) { + smallest_key = Key(start_key, 10); + largest_key = + Key(start_key + key_base * sparseness - key_interval, 10); + stats_checker->AddExpectedStats( + NewManualCompactionJobStats( + smallest_key, largest_key, + 3, 2, num_keys_per_L0_file * 3, + kKeySize, kValueSize, + 1, num_keys_per_L0_file * 2, // 1/3 of the data will be updated. + compression_ratio, + num_keys_per_L0_file)); + ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U); + Compact(1, smallest_key, largest_key); + snprintf(buf, kBufSize, "%d,%d", + --num_L0_files, --num_L1_files); + ASSERT_EQ(std::string(buf), FilesPerLevel(1)); + } + + // 5th Phase: Do a full compaction, which involves in two sub-compactions. + // Here we expect to have 1 L0 files and 4 L1 files + // In the first sub-compaction, we expect L0 compaction. + smallest_key = Key(key_base, 10); + largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10); + stats_checker->AddExpectedStats( + NewManualCompactionJobStats( + Key(key_base * (kTestScale + 1 - sparseness), 10), largest_key, + 2, 1, num_keys_per_L0_file * 3, + kKeySize, kValueSize, + 1, num_keys_per_L0_file * 2, + compression_ratio, + num_keys_per_L0_file)); + // In the second sub-compaction, we expect L1 compaction. + stats_checker->AddExpectedStats( + NewManualCompactionJobStats( + smallest_key, largest_key, + 4, 0, num_keys_per_L0_file * 8, + kKeySize, kValueSize, + 1, num_keys_per_L0_file * 8, + compression_ratio, 0)); + ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 2U); + Compact(1, smallest_key, largest_key); + ASSERT_EQ("0,1", FilesPerLevel(1)); + options.compression = GetAnyCompression(); + if (options.compression == kNoCompression) { + break; + } + stats_checker->EnableCompression(true); + compression_ratio = kCompressionRatio; + } + ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#endif // !ROCKSDB_LITE +#endif // !defined(IOS_CROSS_COMPILE) diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 72747ffe4..e0ab453e9 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -138,6 +138,45 @@ class CompactionJobTest : public testing::Test { std::shared_ptr mock_table_factory_; }; +namespace { +void VerifyInitializationOfCompactionJobStats( + const CompactionJobStats& compaction_job_stats) { +#if !defined(IOS_CROSS_COMPILE) + ASSERT_EQ(compaction_job_stats.elapsed_micros, 0U); + + ASSERT_EQ(compaction_job_stats.num_input_records, 0U); + ASSERT_EQ(compaction_job_stats.num_input_files, 0U); + ASSERT_EQ(compaction_job_stats.num_input_files_at_output_level, 0U); + + ASSERT_EQ(compaction_job_stats.num_output_records, 0U); + ASSERT_EQ(compaction_job_stats.num_output_files, 0U); + + ASSERT_EQ(compaction_job_stats.is_manual_compaction, 0U); + + ASSERT_EQ(compaction_job_stats.total_input_bytes, 0U); + ASSERT_EQ(compaction_job_stats.total_output_bytes, 0U); + + ASSERT_EQ(compaction_job_stats.total_input_raw_key_bytes, 0U); + ASSERT_EQ(compaction_job_stats.total_input_raw_value_bytes, 0U); + + ASSERT_EQ(compaction_job_stats.smallest_output_key_prefix[0], 0); + ASSERT_EQ(compaction_job_stats.largest_output_key_prefix[0], 0); + + ASSERT_EQ(compaction_job_stats.num_records_replaced, 0U); +#endif // !defined(IOS_CROSS_COMPILE) +} + +void VerifyCompactionJobStats( + const CompactionJobStats& compaction_job_stats, + const std::vector& files, + size_t num_output_files, + uint64_t min_elapsed_time) { + ASSERT_GE(compaction_job_stats.elapsed_micros, min_elapsed_time); + ASSERT_EQ(compaction_job_stats.num_input_files, files.size()); + ASSERT_EQ(compaction_job_stats.num_output_files, num_output_files); +} +} // namespace + TEST_F(CompactionJobTest, Simple) { auto cfd = versions_->GetColumnFamilySet()->GetDefault(); @@ -164,11 +203,15 @@ TEST_F(CompactionJobTest, Simple) { mutex_.Lock(); EventLogger event_logger(db_options_.info_log.get()); std::string db_name = "dbname"; + CompactionJobStats compaction_job_stats; CompactionJob compaction_job(0, compaction.get(), db_options_, env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr, nullptr, {}, table_cache_, std::move(yield_callback), &event_logger, false, - db_name); + db_name, &compaction_job_stats); + + auto start_micros = Env::Default()->NowMicros(); + VerifyInitializationOfCompactionJobStats(compaction_job_stats); compaction_job.Prepare(); mutex_.Unlock(); @@ -179,6 +222,10 @@ TEST_F(CompactionJobTest, Simple) { ASSERT_OK(s); mutex_.Unlock(); + VerifyCompactionJobStats( + compaction_job_stats, + files, 1, (Env::Default()->NowMicros() - start_micros) / 2); + mock_table_factory_->AssertLatestFile(expected_results); ASSERT_EQ(yield_callback_called, 20000); } diff --git a/db/db_impl.cc b/db/db_impl.cc index c4df365ce..c44c153ce 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1530,8 +1530,20 @@ Status DBImpl::CompactFilesImpl( &shutting_down_, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()), stats_, snapshots_.GetAll(), table_cache_, std::move(yield_callback), - &event_logger_, c->mutable_cf_options()->paranoid_file_checks, - dbname_); + &event_logger_, c->mutable_cf_options()->paranoid_file_checks, dbname_, + nullptr); // Here we pass a nullptr for CompactionJobStats because + // CompactFiles does not trigger OnCompactionCompleted(), + // which is the only place where CompactionJobStats is + // returned. The idea of not triggering OnCompationCompleted() + // is that CompactFiles runs in the caller thread, so the user + // should always know when it completes. As a result, it makes + // less sense to notify the users something they should already + // know. + // + // In the future, if we would like to add CompactionJobStats + // support for CompactFiles, we should have CompactFiles API + // pass a pointer of CompactionJobStats as the out-value + // instead of using EventListener. compaction_job.Prepare(); mutex_.Unlock(); @@ -1570,7 +1582,9 @@ Status DBImpl::CompactFilesImpl( #endif // ROCKSDB_LITE void DBImpl::NotifyOnCompactionCompleted( - ColumnFamilyData* cfd, Compaction *c, const Status &st) { + ColumnFamilyData* cfd, Compaction *c, const Status &st, + const CompactionJobStats& compaction_job_stats, + const uint64_t job_id) { #ifndef ROCKSDB_LITE if (db_options_.listeners.size() == 0U) { return; @@ -1585,7 +1599,11 @@ void DBImpl::NotifyOnCompactionCompleted( CompactionJobInfo info; info.cf_name = cfd->GetName(); info.status = st; + info.thread_id = ThreadStatusUtil::GetThreadID(); + info.job_id = job_id; + info.base_input_level = c->start_level(); info.output_level = c->output_level(); + info.stats = compaction_job_stats; for (size_t i = 0; i < c->num_input_levels(); ++i) { for (const auto fmd : *c->inputs(i)) { info.input_files.push_back( @@ -2246,6 +2264,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, bool is_manual = (manual_compaction_ != nullptr) && (manual_compaction_->in_progress == false); + CompactionJobStats compaction_job_stats; Status status = bg_error_; if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { status = Status::ShutdownInProgress(); @@ -2389,6 +2408,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, assert(c->level() == 0); assert(c->column_family_data()->ioptions()->compaction_style == kCompactionStyleFIFO); + + compaction_job_stats.num_input_files = c->num_input_files(0); + for (const auto& f : *c->inputs(0)) { c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); } @@ -2408,6 +2430,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, ThreadStatusUtil::SetColumnFamily(c->column_family_data()); ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); + compaction_job_stats.num_input_files = c->num_input_files(0); + // Move file to next level assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); @@ -2456,7 +2480,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, directories_.GetDataDir(c->GetOutputPathId()), stats_, snapshots_.GetAll(), table_cache_, std::move(yield_callback), &event_logger_, c->mutable_cf_options()->paranoid_file_checks, - dbname_); + dbname_, &compaction_job_stats); compaction_job.Prepare(); mutex_.Unlock(); @@ -2470,9 +2494,10 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, } *madeProgress = true; } - // FIXME(orib): should I check if column family data is null? if (c != nullptr) { - NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status); + NotifyOnCompactionCompleted( + c->column_family_data(), c.get(), status, + compaction_job_stats, job_context->job_id); c->ReleaseCompactionFiles(status); *madeProgress = true; } diff --git a/db/db_impl.h b/db/db_impl.h index 333f26d3b..8f25c3e68 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -351,7 +351,9 @@ class DBImpl : public DB { const MutableCFOptions& mutable_cf_options); void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, - Compaction *c, const Status &st); + Compaction *c, const Status &st, + const CompactionJobStats& job_stats, + uint64_t job_id); void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index cd4d456e8..6b38b299e 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -3,14 +3,16 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. // + +#include +#include + #include "merge_helper.h" #include "db/dbformat.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/merge_operator.h" #include "util/statistics.h" -#include -#include #include "util/perf_context_imp.h" #include "util/stop_watch.h" @@ -222,7 +224,6 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, UpdateInternalKey(&original_key[0], original_key.size(), orig_ikey.sequence, orig_ikey.type); - // The final value() is always stored in operands_.back() swap(operands_.back(),merge_result); } else { RecordTick(stats, NUMBER_MERGE_FAILURES); diff --git a/include/rocksdb/compaction_job_stats.h b/include/rocksdb/compaction_job_stats.h new file mode 100644 index 000000000..c7119dd19 --- /dev/null +++ b/include/rocksdb/compaction_job_stats.h @@ -0,0 +1,51 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#include +#include + +namespace rocksdb { +struct CompactionJobStats { + CompactionJobStats() { Reset(); } + void Reset(); + + // the elapsed time in micro of this compaction. + uint64_t elapsed_micros; + + // the number of compaction input records. + uint64_t num_input_records; + // the number of compaction input files. + size_t num_input_files; + // the number of compaction input files at the output level. + size_t num_input_files_at_output_level; + + // the number of compaction output records. + uint64_t num_output_records; + // the number of compaction output files. + size_t num_output_files; + + // true if the compaction is a manual compaction + bool is_manual_compaction; + + // the size of the compaction input in bytes. + uint64_t total_input_bytes; + // the size of the compaction output in bytes. + uint64_t total_output_bytes; + + // number of records being replaced by newer record associated with same key + uint64_t num_records_replaced; + + // the sum of the uncompressed input keys in bytes. + uint64_t total_input_raw_key_bytes; + // the sum of the uncompressed input values in bytes. + uint64_t total_input_raw_value_bytes; + + // 0-terminated strings storing the first 8 bytes of the smallest and + // largest key in the output. + char smallest_output_key_prefix[9]; + char largest_output_key_prefix[9]; +}; +} // namespace rocksdb diff --git a/include/rocksdb/iostats_context.h b/include/rocksdb/iostats_context.h index 009ad9117..d27879ad9 100644 --- a/include/rocksdb/iostats_context.h +++ b/include/rocksdb/iostats_context.h @@ -2,9 +2,7 @@ // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. - -#ifndef INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_ -#define INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_ +#pragma once #include #include @@ -48,5 +46,3 @@ extern __thread IOStatsContext iostats_context; #endif // IOS_CROSS_COMPILE } // namespace rocksdb - -#endif // INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_ diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index ab5e36825..65130b1a2 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -7,6 +7,7 @@ #include #include +#include "rocksdb/compaction_job_stats.h" #include "rocksdb/status.h" #include "rocksdb/table_properties.h" @@ -14,6 +15,7 @@ namespace rocksdb { class DB; class Status; +struct CompactionJobStats; struct TableFileCreationInfo { TableFileCreationInfo() = default; @@ -38,16 +40,29 @@ struct TableFileCreationInfo { #ifndef ROCKSDB_LITE struct CompactionJobInfo { + CompactionJobInfo() = default; + explicit CompactionJobInfo(const CompactionJobStats& _stats) : + stats(_stats) {} + // the name of the column family where the compaction happened. std::string cf_name; // the status indicating whether the compaction was successful or not. Status status; + // the id of the thread that completed this compaction job. + uint64_t thread_id; + // the job id, which is unique in the same thread. + int job_id; + // the smallest input level of the compaction. + int base_input_level; // the output level of the compaction. int output_level; // the names of the compaction input files. std::vector input_files; // the names of the compaction output files. std::vector output_files; + // If non-null, this variable stores detailed information + // about this compaction. + CompactionJobStats stats; }; // EventListener class contains a set of call-back functions that will diff --git a/src.mk b/src.mk index f277a85dc..ec61aa28e 100644 --- a/src.mk +++ b/src.mk @@ -78,6 +78,7 @@ LIB_SOURCES = \ util/cache.cc \ util/coding.cc \ util/comparator.cc \ + util/compaction_job_stats_impl.cc \ util/crc32c.cc \ util/db_info_dumper.cc \ util/dynamic_bloom.cc \ @@ -153,6 +154,7 @@ TEST_BENCH_SOURCES = \ third-party/gtest-1.7.0/fused-src/gtest/gtest-all.cc \ db/column_family_test.cc \ db/compaction_job_test.cc \ + db/compaction_job_stats_test.cc \ db/compaction_picker_test.cc \ db/comparator_db_test.cc \ db/corruption_test.cc \ diff --git a/util/compaction_job_stats_impl.cc b/util/compaction_job_stats_impl.cc new file mode 100644 index 000000000..4933a15cc --- /dev/null +++ b/util/compaction_job_stats_impl.cc @@ -0,0 +1,44 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include "include/rocksdb/compaction_job_stats.h" + +namespace rocksdb { + +#ifndef ROCKSDB_LITE + +void CompactionJobStats::Reset() { + elapsed_micros = 0; + + num_input_files = 0; + num_input_files_at_output_level = 0; + num_output_files = 0; + + num_input_records = 0; + num_output_records = 0; + + total_input_bytes = 0; + total_output_bytes = 0; + + total_input_raw_key_bytes = 0; + total_input_raw_value_bytes = 0; + + num_records_replaced = 0; + + is_manual_compaction = 0; + + smallest_output_key_prefix[0] = 0; + largest_output_key_prefix[0] = 0; +} + +#else + +void CompactionJobStats::Reset() { +} + +#endif // !ROCKSDB_LITE + +} // namespace rocksdb diff --git a/util/thread_status_updater.cc b/util/thread_status_updater.cc index 31845ccb5..3127e491e 100644 --- a/util/thread_status_updater.cc +++ b/util/thread_status_updater.cc @@ -15,6 +15,11 @@ namespace rocksdb { __thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr; +uint64_t ThreadStatusUpdater::GetThreadID() { + auto* data = InitAndGet(); + return data->thread_id; +} + void ThreadStatusUpdater::UnregisterThread() { if (thread_status_data_ != nullptr) { std::lock_guard lck(thread_list_mutex_); @@ -292,6 +297,10 @@ void ThreadStatusUpdater::UnregisterThread() { void ThreadStatusUpdater::ResetThreadStatus() { } +uint64_t ThreadStatusUpdater::GetThreadID() { + return 0; +} + void ThreadStatusUpdater::SetThreadType( ThreadStatus::ThreadType ttype) { } diff --git a/util/thread_status_updater.h b/util/thread_status_updater.h index b511a8dfb..6f7c4e384 100644 --- a/util/thread_status_updater.h +++ b/util/thread_status_updater.h @@ -115,6 +115,8 @@ class ThreadStatusUpdater { // ColumnFamilyInfoKey, ThreadOperation, and ThreadState. void ResetThreadStatus(); + uint64_t GetThreadID(); + // Set the thread type of the current thread. void SetThreadType(ThreadStatus::ThreadType ttype); diff --git a/util/thread_status_util.cc b/util/thread_status_util.cc index c498971e5..7907669a7 100644 --- a/util/thread_status_util.cc +++ b/util/thread_status_util.cc @@ -32,6 +32,16 @@ void ThreadStatusUtil::UnregisterThread() { } } +uint64_t ThreadStatusUtil::GetThreadID() { + if (thread_updater_local_cache_ == nullptr) { + // thread_updater_local_cache_ must be set in SetColumnFamily + // or other ThreadStatusUtil functions. + return 0; + } + return thread_updater_local_cache_->GetThreadID(); +} + + void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) { if (!MaybeInitThreadLocalUpdater(cfd->ioptions()->env)) { return; @@ -170,6 +180,10 @@ bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) { return false; } +uint64_t ThreadStatusUtil::GetThreadID() { + return 0; +} + void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) { } diff --git a/util/thread_status_util.h b/util/thread_status_util.h index ba0238d58..2e52461a0 100644 --- a/util/thread_status_util.h +++ b/util/thread_status_util.h @@ -27,6 +27,8 @@ class ColumnFamilyData; // all function calls to ThreadStatusUtil will be no-op. class ThreadStatusUtil { public: + static uint64_t GetThreadID(); + // Set the thread type of the current thread. static void SetThreadType( const Env* env, ThreadStatus::ThreadType thread_type);