Add compaction listener.

Summary: This adds a listener for compactions, and gives some useful statistics on each compaction pass.

Test Plan: Unit tests.

Reviewers: sdong, igor, rven, yhchiang

Reviewed By: yhchiang

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D31641
main
Ori Bernstein 10 years ago
parent e919ecedfc
commit f9758e0129
  1. 26
      db/column_family.cc
  2. 2
      db/column_family.h
  3. 29
      db/db_impl.cc
  4. 3
      db/db_impl.h
  5. 60
      db/listener_test.cc
  6. 29
      include/rocksdb/listener.h

@ -556,6 +556,32 @@ bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
return false;
}
void ColumnFamilyData::NotifyOnCompactionCompleted(
DB* db, Compaction* c, const Status& status) {
#ifndef ROCKSDB_LITE
auto listeners = ioptions()->listeners;
CompactionJobInfo info;
info.cf_name = c->column_family_data()->GetName();
info.status = status;
info.output_level = c->output_level();
for (const auto fmd : *c->inputs(c->level())) {
info.input_files.push_back(
TableFileName(options_.db_paths,
fmd->fd.GetNumber(),
fmd->fd.GetPathId()));
}
for (const auto newf : c->edit()->GetNewFiles()) {
info.input_files.push_back(
TableFileName(options_.db_paths,
newf.second.fd.GetNumber(),
newf.second.fd.GetPathId()));
}
for (auto listener : listeners) {
listener->OnCompactionCompleted(db, info);
}
#endif // ROCKSDB_LITE
}
void ColumnFamilyData::NotifyOnFlushCompleted(
DB* db, const std::string& file_path,
bool triggered_flush_slowdown,

@ -261,6 +261,8 @@ class ColumnFamilyData {
void ResetThreadLocalSuperVersions();
void NotifyOnCompactionCompleted(DB* db, Compaction* c, const Status& status);
void NotifyOnFlushCompleted(
DB* db, const std::string& file_path,
bool triggered_flush_slowdown,

@ -1433,6 +1433,28 @@ Status DBImpl::CompactFilesImpl(
}
#endif // ROCKSDB_LITE
void DBImpl::NotifyOnCompactionCompleted(
ColumnFamilyData* cfd, Compaction *c, const Status &st) {
#ifndef ROCKSDB_LITE
if (cfd->ioptions()->listeners.size() == 0U) {
return;
}
mutex_.AssertHeld();
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
notifying_events_++;
// release lock while notifying events
mutex_.Unlock();
cfd->NotifyOnCompactionCompleted(this, c, st);
mutex_.Lock();
notifying_events_--;
assert(notifying_events_ >= 0);
// no need to signal bg_cv_ as it will be signaled at the end of the
// flush process.
#endif // ROCKSDB_LITE
}
Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& options_map) {
#ifdef ROCKSDB_LITE
@ -2186,7 +2208,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(),
c->num_input_files(0));
c->ReleaseCompactionFiles(status);
*madeProgress = true;
} else if (!is_manual && c->IsTrivialMove()) {
// Instrument for event update
@ -2221,7 +2242,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
c->level() + 1, f->fd.GetFileSize(), status.ToString().c_str(),
c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
c->ReleaseCompactionFiles(status);
*madeProgress = true;
// Clear Instrument
@ -2246,6 +2266,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
InstallSuperVersionBackground(c->column_family_data(), job_context,
*c->mutable_cf_options());
}
*madeProgress = true;
}
// FIXME(orib): should I check if column family data is null?
if (c != nullptr) {
NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status);
c->ReleaseCompactionFiles(status);
*madeProgress = true;
}

@ -268,6 +268,9 @@ class DBImpl : public DB {
void NotifyOnFlushCompleted(ColumnFamilyData* cfd, uint64_t file_number,
const MutableCFOptions& mutable_cf_options);
void NotifyOnCompactionCompleted(ColumnFamilyData* cfd,
Compaction *c, const Status &st);
void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;
void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const;

@ -2,7 +2,6 @@
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/dbformat.h"
#include "db/db_impl.h"
#include "db/filename.h"
@ -144,12 +143,69 @@ class EventListenerTest {
}
}
DB* db_;
std::string dbname_;
std::vector<ColumnFamilyHandle*> handles_;
};
class TestCompactionListener : public EventListener {
public:
void OnCompactionCompleted(DB *db,
int input_level,
int output_level,
const std::vector<int64_t>& input_files) {
compacted_dbs_.push_back(db);
}
std::vector<DB*> compacted_dbs_;
};
TEST(EventListenerTest, OnSingleDBCompactionTest) {
const int kTestKeySize = 16;
const int kTestValueSize = 984;
const int kEntrySize = kTestKeySize + kTestValueSize;
const int kEntriesPerBuffer = 100;
const int kNumL0Files = 4;
Options options;
options.create_if_missing = true;
options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
options.compaction_style = kCompactionStyleLevel;
options.target_file_size_base = options.write_buffer_size;
options.max_bytes_for_level_base = options.target_file_size_base * 2;
options.max_bytes_for_level_multiplier = 2;
options.compression = kNoCompression;
options.enable_thread_tracking = true;
options.level0_file_num_compaction_trigger = kNumL0Files;
TestCompactionListener* listener = new TestCompactionListener();
options.listeners.emplace_back(listener);
std::vector<std::string> cf_names = {
"pikachu", "ilya", "muromec", "dobrynia",
"nikitich", "alyosha", "popovich"};
CreateAndReopenWithCF(cf_names, &options);
ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
for (size_t i = 1; i < 8; ++i) {
ASSERT_OK(Flush(static_cast<int>(i)));
const Slice kStart = "a";
const Slice kEnd = "z";
ASSERT_OK(dbfull()->CompactRange(handles_[i], &kStart, &kEnd));
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_EQ(listener->compacted_dbs_.size(), cf_names.size());
for (size_t i = 0; i < cf_names.size(); ++i) {
ASSERT_EQ(listener->compacted_dbs_[i], db_);
}
}
class TestFlushListener : public EventListener {
public:
void OnFlushCompleted(

@ -7,6 +7,7 @@
#ifndef ROCKSDB_LITE
#include <string>
#include <vector>
#include "rocksdb/status.h"
namespace rocksdb {
@ -14,6 +15,19 @@ namespace rocksdb {
class DB;
class Status;
struct CompactionJobInfo {
// the name of the column family where the compaction happened.
std::string cf_name;
// the status indicating whether the compaction was successful or not.
Status status;
// the output level of the compaction.
int output_level;
// the names of the compaction input files.
std::vector<std::string> input_files;
// the names of the compaction output files.
std::vector<std::string> output_files;
};
// EventListener class contains a set of call-back functions that will
// be called when specific RocksDB event happens such as flush. It can
// be used as a building block for developing custom features such as
@ -58,6 +72,21 @@ class EventListener {
const std::string& file_path,
bool triggered_writes_slowdown,
bool triggered_writes_stop) {}
// A call-back function for RocksDB which will be called whenever
// a registered RocksDB compacts a file. The default implementation
// is a no-op.
//
// Note that this function must be implemented in a way such that
// it should not run for an extended period of time before the function
// returns. Otherwise, RocksDB may be blocked.
//
// @param db a pointer to the rocksdb instance which just compacted
// a file.
// @param ci a reference to a CompactionJobInfo struct. 'ci' is released
// after this function is returned, and must be copied if it is needed
// outside of this function.
virtual void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) {}
virtual ~EventListener() {}
};

Loading…
Cancel
Save