From f9758e01297c6c5211397515d4befd89f89b35de Mon Sep 17 00:00:00 2001 From: Ori Bernstein Date: Tue, 27 Jan 2015 14:44:02 -0800 Subject: [PATCH] Add compaction listener. Summary: This adds a listener for compactions, and gives some useful statistics on each compaction pass. Test Plan: Unit tests. Reviewers: sdong, igor, rven, yhchiang Reviewed By: yhchiang Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D31641 --- db/column_family.cc | 26 +++++++++++++++++ db/column_family.h | 2 ++ db/db_impl.cc | 29 ++++++++++++++++-- db/db_impl.h | 3 ++ db/listener_test.cc | 60 ++++++++++++++++++++++++++++++++++++-- include/rocksdb/listener.h | 29 ++++++++++++++++++ 6 files changed, 145 insertions(+), 4 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index be01a2993..d3ff9b3f5 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -556,6 +556,32 @@ bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) { return false; } +void ColumnFamilyData::NotifyOnCompactionCompleted( + DB* db, Compaction* c, const Status& status) { +#ifndef ROCKSDB_LITE + auto listeners = ioptions()->listeners; + CompactionJobInfo info; + info.cf_name = c->column_family_data()->GetName(); + info.status = status; + info.output_level = c->output_level(); + for (const auto fmd : *c->inputs(c->level())) { + info.input_files.push_back( + TableFileName(options_.db_paths, + fmd->fd.GetNumber(), + fmd->fd.GetPathId())); + } + for (const auto newf : c->edit()->GetNewFiles()) { + info.input_files.push_back( + TableFileName(options_.db_paths, + newf.second.fd.GetNumber(), + newf.second.fd.GetPathId())); + } + for (auto listener : listeners) { + listener->OnCompactionCompleted(db, info); + } +#endif // ROCKSDB_LITE +} + void ColumnFamilyData::NotifyOnFlushCompleted( DB* db, const std::string& file_path, bool triggered_flush_slowdown, diff --git a/db/column_family.h b/db/column_family.h index a1a9e8034..8101e7032 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -261,6 +261,8 @@ class ColumnFamilyData { void ResetThreadLocalSuperVersions(); + void NotifyOnCompactionCompleted(DB* db, Compaction* c, const Status& status); + void NotifyOnFlushCompleted( DB* db, const std::string& file_path, bool triggered_flush_slowdown, diff --git a/db/db_impl.cc b/db/db_impl.cc index 2be3d2359..8e8f3b733 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1433,6 +1433,28 @@ Status DBImpl::CompactFilesImpl( } #endif // ROCKSDB_LITE +void DBImpl::NotifyOnCompactionCompleted( + ColumnFamilyData* cfd, Compaction *c, const Status &st) { +#ifndef ROCKSDB_LITE + if (cfd->ioptions()->listeners.size() == 0U) { + return; + } + mutex_.AssertHeld(); + if (shutting_down_.load(std::memory_order_acquire)) { + return; + } + notifying_events_++; + // release lock while notifying events + mutex_.Unlock(); + cfd->NotifyOnCompactionCompleted(this, c, st); + mutex_.Lock(); + notifying_events_--; + assert(notifying_events_ >= 0); + // no need to signal bg_cv_ as it will be signaled at the end of the + // flush process. +#endif // ROCKSDB_LITE +} + Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, const std::unordered_map& options_map) { #ifdef ROCKSDB_LITE @@ -2186,7 +2208,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, LogToBuffer(log_buffer, "[%s] Deleted %d files\n", c->column_family_data()->GetName().c_str(), c->num_input_files(0)); - c->ReleaseCompactionFiles(status); *madeProgress = true; } else if (!is_manual && c->IsTrivialMove()) { // Instrument for event update @@ -2221,7 +2242,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, c->column_family_data()->GetName().c_str(), f->fd.GetNumber(), c->level() + 1, f->fd.GetFileSize(), status.ToString().c_str(), c->column_family_data()->current()->storage_info()->LevelSummary(&tmp)); - c->ReleaseCompactionFiles(status); *madeProgress = true; // Clear Instrument @@ -2246,6 +2266,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, InstallSuperVersionBackground(c->column_family_data(), job_context, *c->mutable_cf_options()); } + *madeProgress = true; + } + // FIXME(orib): should I check if column family data is null? + if (c != nullptr) { + NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status); c->ReleaseCompactionFiles(status); *madeProgress = true; } diff --git a/db/db_impl.h b/db/db_impl.h index 70fa14727..3b3376665 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -268,6 +268,9 @@ class DBImpl : public DB { void NotifyOnFlushCompleted(ColumnFamilyData* cfd, uint64_t file_number, const MutableCFOptions& mutable_cf_options); + void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, + Compaction *c, const Status &st); + void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const; void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const; diff --git a/db/listener_test.cc b/db/listener_test.cc index dfc075803..a1577fba2 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -2,7 +2,6 @@ // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. - #include "db/dbformat.h" #include "db/db_impl.h" #include "db/filename.h" @@ -144,12 +143,69 @@ class EventListenerTest { } } - DB* db_; std::string dbname_; std::vector handles_; }; +class TestCompactionListener : public EventListener { + public: + void OnCompactionCompleted(DB *db, + int input_level, + int output_level, + const std::vector& input_files) { + compacted_dbs_.push_back(db); + } + + std::vector compacted_dbs_; +}; + +TEST(EventListenerTest, OnSingleDBCompactionTest) { + const int kTestKeySize = 16; + const int kTestValueSize = 984; + const int kEntrySize = kTestKeySize + kTestValueSize; + const int kEntriesPerBuffer = 100; + const int kNumL0Files = 4; + + Options options; + options.create_if_missing = true; + options.write_buffer_size = kEntrySize * kEntriesPerBuffer; + options.compaction_style = kCompactionStyleLevel; + options.target_file_size_base = options.write_buffer_size; + options.max_bytes_for_level_base = options.target_file_size_base * 2; + options.max_bytes_for_level_multiplier = 2; + options.compression = kNoCompression; + options.enable_thread_tracking = true; + options.level0_file_num_compaction_trigger = kNumL0Files; + + TestCompactionListener* listener = new TestCompactionListener(); + options.listeners.emplace_back(listener); + std::vector cf_names = { + "pikachu", "ilya", "muromec", "dobrynia", + "nikitich", "alyosha", "popovich"}; + CreateAndReopenWithCF(cf_names, &options); + ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p'))); + ASSERT_OK(Put(2, "ilya", std::string(90000, 'i'))); + ASSERT_OK(Put(3, "muromec", std::string(90000, 'm'))); + ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd'))); + ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n'))); + ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a'))); + ASSERT_OK(Put(7, "popovich", std::string(90000, 'p'))); + for (size_t i = 1; i < 8; ++i) { + ASSERT_OK(Flush(static_cast(i))); + const Slice kStart = "a"; + const Slice kEnd = "z"; + ASSERT_OK(dbfull()->CompactRange(handles_[i], &kStart, &kEnd)); + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + + ASSERT_EQ(listener->compacted_dbs_.size(), cf_names.size()); + for (size_t i = 0; i < cf_names.size(); ++i) { + ASSERT_EQ(listener->compacted_dbs_[i], db_); + } +} + class TestFlushListener : public EventListener { public: void OnFlushCompleted( diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 4ad1ae04b..be5b96032 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -7,6 +7,7 @@ #ifndef ROCKSDB_LITE #include +#include #include "rocksdb/status.h" namespace rocksdb { @@ -14,6 +15,19 @@ namespace rocksdb { class DB; class Status; +struct CompactionJobInfo { + // the name of the column family where the compaction happened. + std::string cf_name; + // the status indicating whether the compaction was successful or not. + Status status; + // the output level of the compaction. + int output_level; + // the names of the compaction input files. + std::vector input_files; + // the names of the compaction output files. + std::vector output_files; +}; + // EventListener class contains a set of call-back functions that will // be called when specific RocksDB event happens such as flush. It can // be used as a building block for developing custom features such as @@ -58,6 +72,21 @@ class EventListener { const std::string& file_path, bool triggered_writes_slowdown, bool triggered_writes_stop) {} + + // A call-back function for RocksDB which will be called whenever + // a registered RocksDB compacts a file. The default implementation + // is a no-op. + // + // Note that this function must be implemented in a way such that + // it should not run for an extended period of time before the function + // returns. Otherwise, RocksDB may be blocked. + // + // @param db a pointer to the rocksdb instance which just compacted + // a file. + // @param ci a reference to a CompactionJobInfo struct. 'ci' is released + // after this function is returned, and must be copied if it is needed + // outside of this function. + virtual void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) {} virtual ~EventListener() {} };