Call OnCompactionCompleted API in case of DisableManualCompaction (#8469)

Summary:
Call OnCompactionCompleted API in case of
DisableManualCompaction() with updated Status::Incomplete

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8469

Reviewed By: ajkr

Differential Revision: D29475517

Pulled By: akankshamahajan15

fbshipit-source-id: a1726c5e6ee18c0b5097ea04f5e6975fbe108055
main
Akanksha Mahajan 4 years ago committed by Facebook GitHub Bot
parent b20737709f
commit c76778e2bd
  1. 1
      HISTORY.md
  2. 3
      db/compaction/compaction.cc
  3. 14
      db/compaction/compaction.h
  4. 8
      db/db_impl/db_impl_compaction_flush.cc
  5. 170
      db/db_test2.cc

@ -6,6 +6,7 @@
### Bug Fixes
* Blob file checksums are now printed in hexadecimal format when using the `manifest_dump` `ldb` command.
* `GetLiveFilesMetaData()` now populates the `temperature`, `oldest_ancester_time`, and `file_creation_time` fields of its `LiveFileMetaData` results when the information is available. Previously these fields always contained zero indicating unknown.
* Fix mismatches of OnCompaction{Begin,Completed} in case of DisableManualCompaction().
### New Features
* ldb has a new feature, `list_live_files_metadata`, that shows the live SST files, as well as their LSM storage level and the column family they belong to.

@ -237,7 +237,8 @@ Compaction::Compaction(
is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
is_manual_compaction_(_manual_compaction),
is_trivial_move_(false),
compaction_reason_(_compaction_reason) {
compaction_reason_(_compaction_reason),
notify_on_compaction_completion_(false) {
MarkFilesBeingCompacted(true);
if (is_manual_compaction_) {
compaction_reason_ = CompactionReason::kManualCompaction;

@ -303,6 +303,16 @@ class Compaction {
uint64_t MinInputFileOldestAncesterTime() const;
// Called by DBImpl::NotifyOnCompactionCompleted to make sure number of
// compaction begin and compaction completion callbacks match.
void SetNotifyOnCompactionCompleted() {
notify_on_compaction_completion_ = true;
}
bool ShouldNotifyOnCompactionCompleted() const {
return notify_on_compaction_completion_;
}
private:
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);
@ -386,6 +396,10 @@ class Compaction {
// Reason for compaction
CompactionReason compaction_reason_;
// Notify on compaction completion only if listener was notified on compaction
// begin.
bool notify_on_compaction_completion_;
};
// Return sum of sizes of all files in `files`.

@ -1395,6 +1395,8 @@ void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
return;
}
c->SetNotifyOnCompactionCompleted();
Version* current = cfd->current();
current->Ref();
// release lock while notifying events
@ -1430,10 +1432,8 @@ void DBImpl::NotifyOnCompactionCompleted(
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
// TODO: Should disabling manual compaction squash compaction completed
// notifications that aren't the result of a shutdown?
if (c->is_manual_compaction() &&
manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
if (c->ShouldNotifyOnCompactionCompleted() == false) {
return;
}

@ -3376,6 +3376,176 @@ TEST_F(DBTest2, CancelManualCompaction2) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
class CancelCompactionListener : public EventListener {
public:
CancelCompactionListener()
: num_compaction_started_(0), num_compaction_ended_(0) {}
void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override {
ASSERT_EQ(ci.cf_name, "default");
ASSERT_EQ(ci.base_input_level, 0);
num_compaction_started_++;
}
void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
ASSERT_EQ(ci.cf_name, "default");
ASSERT_EQ(ci.base_input_level, 0);
ASSERT_EQ(ci.status.code(), code_);
ASSERT_EQ(ci.status.subcode(), subcode_);
num_compaction_ended_++;
}
std::atomic<size_t> num_compaction_started_;
std::atomic<size_t> num_compaction_ended_;
Status::Code code_;
Status::SubCode subcode_;
};
TEST_F(DBTest2, CancelManualCompactionWithListener) {
CompactRangeOptions compact_options;
auto canceledPtr =
std::unique_ptr<std::atomic<bool>>(new std::atomic<bool>{true});
compact_options.canceled = canceledPtr.get();
compact_options.max_subcompactions = 1;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
CancelCompactionListener* listener = new CancelCompactionListener();
options.listeners.emplace_back(listener);
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
ASSERT_OK(Put(Key(i + j * 10), rnd.RandomString(50)));
}
Flush();
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator:ProcessKV", [&](void* /*arg*/) {
compact_options.canceled->store(true, std::memory_order_release);
});
int running_compaction = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::FinishCompactionOutputFile1",
[&](void* /*arg*/) { running_compaction++; });
// Case I: 1 Notify begin compaction, 2 DisableManualCompaction, 3 Compaction
// not run, 4 Notify compaction end.
listener->code_ = Status::kIncomplete;
listener->subcode_ = Status::SubCode::kManualCompactionPaused;
compact_options.canceled->store(false, std::memory_order_release);
dbfull()->CompactRange(compact_options, nullptr, nullptr);
dbfull()->TEST_WaitForCompact(true);
ASSERT_GT(listener->num_compaction_started_, 0);
ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
ASSERT_EQ(running_compaction, 0);
listener->num_compaction_started_ = 0;
listener->num_compaction_ended_ = 0;
// Case II: 1 DisableManualCompaction, 2 Notify begin compaction (return
// without notifying), 3 Notify compaction end (return without notifying).
dbfull()->CompactRange(compact_options, nullptr, nullptr);
dbfull()->TEST_WaitForCompact(true);
ASSERT_EQ(listener->num_compaction_started_, 0);
ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
ASSERT_EQ(running_compaction, 0);
// Case III: 1 Notify begin compaction, 2 Compaction in between
// 3. DisableManualCompaction, , 4 Notify compaction end.
// compact_options.canceled->store(false, std::memory_order_release);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"CompactionIterator:ProcessKV");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run:BeforeVerify", [&](void* /*arg*/) {
compact_options.canceled->store(true, std::memory_order_release);
});
listener->code_ = Status::kOk;
listener->subcode_ = Status::SubCode::kNone;
compact_options.canceled->store(false, std::memory_order_release);
dbfull()->CompactRange(compact_options, nullptr, nullptr);
dbfull()->TEST_WaitForCompact(true);
ASSERT_GT(listener->num_compaction_started_, 0);
ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
// Compaction job will succeed.
ASSERT_GT(running_compaction, 0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, CompactionOnBottomPriorityWithListener) {
int num_levels = 3;
const int kNumFilesTrigger = 4;
Options options = CurrentOptions();
env_->SetBackgroundThreads(0, Env::Priority::HIGH);
env_->SetBackgroundThreads(0, Env::Priority::LOW);
env_->SetBackgroundThreads(1, Env::Priority::BOTTOM);
options.env = env_;
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels;
options.write_buffer_size = 100 << 10; // 100KB
options.target_file_size_base = 32 << 10; // 32KB
options.level0_file_num_compaction_trigger = kNumFilesTrigger;
// Trigger compaction if size amplification exceeds 110%
options.compaction_options_universal.max_size_amplification_percent = 110;
CancelCompactionListener* listener = new CancelCompactionListener();
options.listeners.emplace_back(listener);
DestroyAndReopen(options);
int num_bottom_thread_compaction_scheduled = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:ForwardToBottomPriPool",
[&](void* /*arg*/) { num_bottom_thread_compaction_scheduled++; });
int num_compaction_jobs = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():End",
[&](void* /*arg*/) { num_compaction_jobs++; });
listener->code_ = Status::kOk;
listener->subcode_ = Status::SubCode::kNone;
Random rnd(301);
for (int i = 0; i < 1; ++i) {
for (int num = 0; num < kNumFilesTrigger; num++) {
int key_idx = 0;
GenerateNewFile(&rnd, &key_idx, true /* no_wait */);
// use no_wait above because that one waits for flush and compaction. We
// don't want to wait for compaction because the full compaction is
// intentionally blocked while more files are flushed.
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(num_bottom_thread_compaction_scheduled, 0);
ASSERT_EQ(num_compaction_jobs, 1);
ASSERT_GT(listener->num_compaction_started_, 0);
ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, OptimizeForPointLookup) {
Options options = CurrentOptions();
Close();

Loading…
Cancel
Save