support OnCompactionBegin (#4431)

Summary:
fix #4288

Add `OnCompactionBegin` support to `rocksdb::EventListener`.

Currently, we only have these three callbacks:

- OnFlushBegin
- OnFlushCompleted
- OnCompactionCompleted

As paolococchi requested in #4288 , and ajkr agreed, we should also support `OnCompactionBegin`.

This PR is a try to implement the support of `OnCompactionBegin`.

Hope it is useful to you.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4431

Differential Revision: D10055515

Pulled By: yiwu-arbug

fbshipit-source-id: 39c0f95f8e9ff1c7ca3a10787502a17f258d2334
main
Peter Pei 6 years ago committed by Facebook Github Bot
parent f8c1de4c7c
commit 09814f2cfc
  1. 5
      db/db_impl.h
  2. 72
      db/db_impl_compaction_flush.cc
  3. 19
      db/db_test2.cc
  4. 10
      include/rocksdb/listener.h

@ -735,6 +735,11 @@ class DBImpl : public DB {
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop);
void NotifyOnCompactionBegin(ColumnFamilyData* cfd,
Compaction *c, const Status &st,
const CompactionJobStats& job_stats,
int job_id);
void NotifyOnCompactionCompleted(ColumnFamilyData* cfd,
Compaction *c, const Status &st,
const CompactionJobStats& job_stats,

@ -744,6 +744,69 @@ Status DBImpl::ContinueBackgroundWork() {
return Status::OK();
}
void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd,
Compaction *c, const Status &st,
const CompactionJobStats& job_stats,
int job_id) {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.listeners.empty()) {
return;
}
mutex_.AssertHeld();
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
Version* current = cfd->current();
current->Ref();
// release lock while notifying events
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex");
{
CompactionJobInfo info;
info.cf_name = cfd->GetName();
info.status = st;
info.thread_id = env_->GetThreadID();
info.job_id = job_id;
info.base_input_level = c->start_level();
info.output_level = c->output_level();
info.stats = job_stats;
info.table_properties = c->GetOutputTableProperties();
info.compaction_reason = c->compaction_reason();
info.compression = c->output_compression();
for (size_t i = 0; i < c->num_input_levels(); ++i) {
for (const auto fmd : *c->inputs(i)) {
auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
fmd->fd.GetNumber(), fmd->fd.GetPathId());
info.input_files.push_back(fn);
if (info.table_properties.count(fn) == 0) {
std::shared_ptr<const TableProperties> tp;
auto s = current->GetTableProperties(&tp, fmd, &fn);
if (s.ok()) {
info.table_properties[fn] = tp;
}
}
}
}
for (const auto newf : c->edit()->GetNewFiles()) {
info.output_files.push_back(TableFileName(
c->immutable_cf_options()->cf_paths, newf.second.fd.GetNumber(),
newf.second.fd.GetPathId()));
}
for (auto listener : immutable_db_options_.listeners) {
listener->OnCompactionBegin(this, info);
}
}
mutex_.Lock();
current->Unref();
#else
(void)cfd;
(void)c;
(void)st;
(void)job_stats;
(void)job_id;
#endif // ROCKSDB_LITE
}
void DBImpl::NotifyOnCompactionCompleted(
ColumnFamilyData* cfd, Compaction* c, const Status& st,
const CompactionJobStats& compaction_job_stats, const int job_id) {
@ -1945,6 +2008,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
compaction_job_stats.num_input_files = c->num_input_files(0);
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
}
@ -1969,6 +2035,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
compaction_job_stats.num_input_files = c->num_input_files(0);
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
// Move files to next level
int32_t moved_files = 0;
int64_t moved_bytes = 0;
@ -2068,6 +2137,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
&compaction_job_stats);
compaction_job.Prepare();
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
mutex_.Unlock();
compaction_job.Run();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");

@ -1228,7 +1228,14 @@ TEST_F(DBTest2, CompressionOptions) {
class CompactionStallTestListener : public EventListener {
public:
CompactionStallTestListener() : compacted_files_cnt_(0) {}
CompactionStallTestListener() : compacting_files_cnt_(0), compacted_files_cnt_(0) {}
void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override {
ASSERT_EQ(ci.cf_name, "default");
ASSERT_EQ(ci.base_input_level, 0);
ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
compacting_files_cnt_ += ci.input_files.size();
}
void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
ASSERT_EQ(ci.cf_name, "default");
@ -1236,6 +1243,8 @@ class CompactionStallTestListener : public EventListener {
ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
compacted_files_cnt_ += ci.input_files.size();
}
std::atomic<size_t> compacting_files_cnt_;
std::atomic<size_t> compacted_files_cnt_;
};
@ -1244,6 +1253,8 @@ TEST_F(DBTest2, CompactionStall) {
{{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:0"},
{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:1"},
{"DBTest2::CompactionStall:2",
"DBImpl::NotifyOnCompactionBegin::UnlockMutex"},
{"DBTest2::CompactionStall:3",
"DBImpl::NotifyOnCompactionCompleted::UnlockMutex"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
@ -1285,14 +1296,18 @@ TEST_F(DBTest2, CompactionStall) {
// Wait for another compaction to be triggered
TEST_SYNC_POINT("DBTest2::CompactionStall:1");
// Hold NotifyOnCompactionCompleted in the unlock mutex section
// Hold NotifyOnCompactionBegin in the unlock mutex section
TEST_SYNC_POINT("DBTest2::CompactionStall:2");
// Hold NotifyOnCompactionCompleted in the unlock mutex section
TEST_SYNC_POINT("DBTest2::CompactionStall:3");
dbfull()->TEST_WaitForCompact();
ASSERT_LT(NumTableFilesAtLevel(0),
options.level0_file_num_compaction_trigger);
ASSERT_GT(listener->compacted_files_cnt_.load(),
10 - options.level0_file_num_compaction_trigger);
ASSERT_EQ(listener->compacting_files_cnt_.load(), listener->compacted_files_cnt_.load());
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}

@ -299,6 +299,16 @@ class EventListener {
// returned value.
virtual void OnTableFileDeleted(const TableFileDeletionInfo& /*info*/) {}
// A callback function to RocksDB which will be called before a
// RocksDB starts to compact. The default implementation is
// no-op.
//
// Note that the this function must be implemented in a way such that
// it should not run for an extended period of time before the function
// returns. Otherwise, RocksDB may be blocked.
virtual void OnCompactionBegin(DB* /*db*/,
const CompactionJobInfo& /*ci*/) {}
// A callback function for RocksDB which will be called whenever
// a registered RocksDB compacts a file. The default implementation
// is a no-op.

Loading…
Cancel
Save