From 622683000c707d0203ddbaf2fb1186120dce036f Mon Sep 17 00:00:00 2001 From: andrew Date: Mon, 16 Sep 2019 21:00:13 -0700 Subject: [PATCH] Allow users to stop manual compactions (#3971) Summary: Manual compaction may bring in very high load because sometime the amount of data involved in a compaction could be large, which may affect online service. So it would be good if the running compaction making the server busy can be stopped immediately. In this implementation, stopping manual compaction condition is only checked in slow process. We let deletion compaction and trivial move go through. Pull Request resolved: https://github.com/facebook/rocksdb/pull/3971 Test Plan: add tests at more spots. Differential Revision: D17369043 fbshipit-source-id: 575a624fb992ce0bb07d9443eb209e547740043c --- db/compaction/compaction_iterator.cc | 17 +- db/compaction/compaction_iterator.h | 13 +- db/compaction/compaction_job.cc | 23 ++- db/compaction/compaction_job.h | 4 +- db/db_impl/db_impl.cc | 1 + db/db_impl/db_impl.h | 4 + db/db_impl/db_impl_compaction_flush.cc | 46 ++++- db/db_test.cc | 4 + db/db_test2.cc | 211 +++++++++++++++++++++++ include/rocksdb/db.h | 3 + include/rocksdb/status.h | 7 + include/rocksdb/utilities/stackable_db.h | 7 + util/status.cc | 4 + 13 files changed, 330 insertions(+), 14 deletions(-) diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 135018f51..73a7c3974 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -38,7 +38,8 @@ CompactionIterator::CompactionIterator( const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, - SnapshotListFetchCallback* snap_list_callback) + SnapshotListFetchCallback* snap_list_callback, + const std::atomic* manual_compaction_paused) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, @@ -46,7 +47,8 @@ CompactionIterator::CompactionIterator( std::unique_ptr( compaction ? new CompactionProxy(compaction) : nullptr), compaction_filter, shutting_down, preserve_deletes_seqnum, - snap_list_callback) {} + snap_list_callback, + manual_compaction_paused) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -59,7 +61,8 @@ CompactionIterator::CompactionIterator( const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, - SnapshotListFetchCallback* snap_list_callback) + SnapshotListFetchCallback* snap_list_callback, + const std::atomic* manual_compaction_paused) : input_(input), cmp_(cmp), merge_helper_(merge_helper), @@ -73,6 +76,7 @@ CompactionIterator::CompactionIterator( compaction_(std::move(compaction)), compaction_filter_(compaction_filter), shutting_down_(shutting_down), + manual_compaction_paused_(manual_compaction_paused), preserve_deletes_seqnum_(preserve_deletes_seqnum), current_user_key_sequence_(0), current_user_key_snapshot_(0), @@ -234,7 +238,8 @@ void CompactionIterator::NextFromInput() { at_next_ = false; valid_ = false; - while (!valid_ && input_->Valid() && !IsShuttingDown()) { + while (!valid_ && input_->Valid() && !IsPausingManualCompaction() && + !IsShuttingDown()) { key_ = input_->key(); value_ = input_->value(); iter_stats_.num_input_records++; @@ -612,6 +617,10 @@ void CompactionIterator::NextFromInput() { if (!valid_ && IsShuttingDown()) { status_ = Status::ShutdownInProgress(); } + + if (IsPausingManualCompaction()) { + status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } } void CompactionIterator::PrepareOutput() { diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 2bf847e2e..c06373950 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -118,7 +118,8 @@ class CompactionIterator { const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, const SequenceNumber preserve_deletes_seqnum = 0, - SnapshotListFetchCallback* snap_list_callback = nullptr); + SnapshotListFetchCallback* snap_list_callback = nullptr, + const std::atomic* manual_compaction_paused = nullptr); // Constructor with custom CompactionProxy, used for tests. CompactionIterator(InternalIterator* input, const Comparator* cmp, @@ -132,7 +133,8 @@ class CompactionIterator { const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, const SequenceNumber preserve_deletes_seqnum = 0, - SnapshotListFetchCallback* snap_list_callback = nullptr); + SnapshotListFetchCallback* snap_list_callback = nullptr, + const std::atomic* manual_compaction_paused = nullptr); ~CompactionIterator(); @@ -213,6 +215,7 @@ class CompactionIterator { std::unique_ptr compaction_; const CompactionFilter* compaction_filter_; const std::atomic* shutting_down_; + const std::atomic* manual_compaction_paused_; const SequenceNumber preserve_deletes_seqnum_; bool bottommost_level_; bool valid_ = false; @@ -279,5 +282,11 @@ class CompactionIterator { // This is a best-effort facility, so memory_order_relaxed is sufficient. return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); } + + bool IsPausingManualCompaction() { + // This is a best-effort facility, so memory_order_relaxed is sufficient. + return manual_compaction_paused_ && + manual_compaction_paused_->load(std::memory_order_relaxed); + } }; } // namespace rocksdb diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 1acc48b4c..d49783144 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -311,7 +311,8 @@ CompactionJob::CompactionJob( const SnapshotChecker* snapshot_checker, std::shared_ptr table_cache, EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats, - Env::Priority thread_pri, SnapshotListFetchCallback* snap_list_callback) + Env::Priority thread_pri, SnapshotListFetchCallback* snap_list_callback, + const std::atomic* manual_compaction_paused) : job_id_(job_id), compact_(new CompactionState(compaction)), compaction_job_stats_(compaction_job_stats), @@ -324,6 +325,7 @@ CompactionJob::CompactionJob( env_->OptimizeForCompactionTableRead(env_options, db_options_)), versions_(versions), shutting_down_(shutting_down), + manual_compaction_paused_(manual_compaction_paused), preserve_deletes_seqnum_(preserve_deletes_seqnum), log_buffer_(log_buffer), db_directory_(db_directory), @@ -867,9 +869,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { false /* internal key corruption is expected */, existing_snapshots_.empty() ? 0 : existing_snapshots_.back(), snapshot_checker_, compact_->compaction->level(), - db_options_.statistics.get(), shutting_down_); + db_options_.statistics.get()); TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); + TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:1", + reinterpret_cast( + const_cast *>(manual_compaction_paused_))); Slice* start = sub_compact->start; Slice* end = sub_compact->end; @@ -889,7 +894,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { &range_del_agg, sub_compact->compaction, compaction_filter, shutting_down_, preserve_deletes_seqnum_, // Currently range_del_agg is incompatible with snapshot refresh feature. - range_del_agg.IsEmpty() ? snap_list_callback_ : nullptr)); + range_del_agg.IsEmpty() ? snap_list_callback_ : nullptr, + manual_compaction_paused_)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { @@ -953,7 +959,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { input_status = input->status(); output_file_ended = true; } + TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:2", + reinterpret_cast( + const_cast *>(manual_compaction_paused_))); c_iter->Next(); + if (c_iter->status().IsManualCompactionPaused()) { + break; + } if (!output_file_ended && c_iter->Valid() && sub_compact->compaction->output_level() != 0 && sub_compact->ShouldStopBefore(c_iter->key(), @@ -1006,6 +1018,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { shutting_down_->load(std::memory_order_relaxed)) { status = Status::ShutdownInProgress("Database shutdown"); } + if ((status.ok() || status.IsColumnFamilyDropped()) && + (manual_compaction_paused_ && + manual_compaction_paused_->load(std::memory_order_relaxed))) { + status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } if (status.ok()) { status = input->status(); } diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 84d38c163..79069770c 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -75,7 +75,8 @@ class CompactionJob { std::shared_ptr table_cache, EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats, - Env::Priority thread_pri, SnapshotListFetchCallback* snap_list_callback); + Env::Priority thread_pri, SnapshotListFetchCallback* snap_list_callback, + const std::atomic* manual_compaction_paused = nullptr); ~CompactionJob(); @@ -154,6 +155,7 @@ class CompactionJob { EnvOptions env_options_for_read_; VersionSet* versions_; const std::atomic* shutting_down_; + const std::atomic* manual_compaction_paused_; const SequenceNumber preserve_deletes_seqnum_; LogBuffer* log_buffer_; Directory* db_directory_; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 443138908..710fde5b4 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -165,6 +165,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, batch_per_txn_(batch_per_txn), db_lock_(nullptr), shutting_down_(false), + manual_compaction_paused_(false), bg_cv_(&mutex_), logfile_number_(0), log_dir_synced_(false), diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 3e45442ca..3552a7cf2 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -282,6 +282,9 @@ class DBImpl : public DB { virtual Status EnableAutoCompaction( const std::vector& column_family_handles) override; + virtual void EnableManualCompaction() override; + virtual void DisableManualCompaction() override; + using DB::SetOptions; Status SetOptions( ColumnFamilyHandle* column_family, @@ -1638,6 +1641,7 @@ class DBImpl : public DB { InstrumentedMutex log_write_mutex_; std::atomic shutting_down_; + std::atomic manual_compaction_paused_; // This condition variable is signaled on these conditions: // * whenever bg_compaction_scheduled_ goes down to 0 // * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 1b862e033..724d54f09 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -917,6 +917,9 @@ Status DBImpl::CompactFilesImpl( if (shutting_down_.load(std::memory_order_acquire)) { return Status::ShutdownInProgress(); } + if (manual_compaction_paused_.load(std::memory_order_acquire)) { + return Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } std::unordered_set input_set; for (const auto& file_name : input_file_names) { @@ -1012,7 +1015,8 @@ Status DBImpl::CompactFilesImpl( immutable_db_options_.max_subcompactions <= 1 && c->mutable_cf_options()->snap_refresh_nanos > 0 ? &fetch_callback - : nullptr); + : nullptr, + &manual_compaction_paused_); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -1058,6 +1062,12 @@ Status DBImpl::CompactFilesImpl( // Done } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) { // Ignore compaction errors found during shutting down + } else if (status.IsManualCompactionPaused()) { + // Don't report stopping manual compaction as error + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "[%s] [JOB %d] Stopping manual compaction", + c->column_family_data()->GetName().c_str(), + job_context->job_id); } else { ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] [JOB %d] Compaction error: %s", @@ -1128,6 +1138,10 @@ void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, if (shutting_down_.load(std::memory_order_acquire)) { return; } + if (c->is_manual_compaction() && + manual_compaction_paused_.load(std::memory_order_acquire)) { + return; + } Version* current = cfd->current(); current->Ref(); // release lock while notifying events @@ -1190,6 +1204,10 @@ void DBImpl::NotifyOnCompactionCompleted( if (shutting_down_.load(std::memory_order_acquire)) { return; } + if (c->is_manual_compaction() && + manual_compaction_paused_.load(std::memory_order_acquire)) { + return; + } Version* current = cfd->current(); current->Ref(); // release lock while notifying events @@ -1879,6 +1897,14 @@ Status DBImpl::EnableAutoCompaction( return s; } +void DBImpl::DisableManualCompaction() { + manual_compaction_paused_.store(true, std::memory_order_release); +} + +void DBImpl::EnableManualCompaction() { + manual_compaction_paused_.store(false, std::memory_order_release); +} + void DBImpl::MaybeScheduleFlushOrCompaction() { mutex_.AssertHeld(); if (!opened_successfully_) { @@ -2319,6 +2345,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, env_->SleepForMicroseconds(10000); // prevent hot loop mutex_.Lock(); } else if (!s.ok() && !s.IsShutdownInProgress() && + !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to @@ -2336,6 +2363,12 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, LogFlush(immutable_db_options_.info_log); env_->SleepForMicroseconds(1000000); mutex_.Lock(); + } else if (s.IsManualCompactionPaused()) { + ManualCompactionState *m = prepicked_compaction->manual_compaction_state; + assert(m); + ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused", + m->cfd->GetName().c_str(), + job_context.job_id); } ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); @@ -2344,7 +2377,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, // have created (they might not be all recorded in job_context in case of a // failure). Thus, we force full scan in FindObsoleteFiles() FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() && - !s.IsColumnFamilyDropped()); + !s.IsManualCompactionPaused() && + !s.IsColumnFamilyDropped()); TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"); // delete unnecessary files if any, this is done outside the mutex @@ -2427,6 +2461,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (!error_handler_.IsBGWorkStopped()) { if (shutting_down_.load(std::memory_order_acquire)) { status = Status::ShutdownInProgress(); + } else if (is_manual && + manual_compaction_paused_.load(std::memory_order_acquire)) { + status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); } } else { status = error_handler_.GetBGError(); @@ -2744,7 +2781,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, immutable_db_options_.max_subcompactions <= 1 && c->mutable_cf_options()->snap_refresh_nanos > 0 ? &fetch_callback - : nullptr); + : nullptr, is_manual ? &manual_compaction_paused_ : nullptr); compaction_job.Prepare(); NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, @@ -2784,7 +2821,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, compaction_job_stats, job_context->job_id); } - if (status.ok() || status.IsCompactionTooLarge()) { + if (status.ok() || status.IsCompactionTooLarge() || + status.IsManualCompactionPaused()) { // Done } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) { // Ignore compaction errors found during shutting down diff --git a/db/db_test.cc b/db/db_test.cc index 60a077f57..9ca550b8d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2737,6 +2737,10 @@ class ModelDB : public DB { return Status::NotSupported("Not supported operation."); } + void EnableManualCompaction() override { return; } + + void DisableManualCompaction() override { return; } + using DB::NumberLevels; int NumberLevels(ColumnFamilyHandle* /*column_family*/) override { return 1; } diff --git a/db/db_test2.cc b/db/db_test2.cc index cf622973a..a4c5b5aa3 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2402,6 +2402,217 @@ TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(DBTest2, PausingManualCompaction1) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.num_levels = 7; + + DestroyAndReopen(options); + Random rnd(301); + // Generate a file containing 10 keys. + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + + // Generate another file containing same keys + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + + int manual_compactions_paused = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():PausingManualCompaction:1", [&](void* arg) { + auto paused = reinterpret_cast*>(arg); + ASSERT_FALSE(paused->load(std::memory_order_acquire)); + paused->store(true, std::memory_order_release); + manual_compactions_paused += 1; + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector files_before_compact, files_after_compact; + // Remember file name before compaction is triggered + std::vector files_meta; + dbfull()->GetLiveFilesMetaData(&files_meta); + for (auto file : files_meta) { + files_before_compact.push_back(file.name); + } + + // OK, now trigger a manual compaction + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); + + // Wait for compactions to get scheduled and stopped + dbfull()->TEST_WaitForCompact(true); + + // Get file names after compaction is stopped + files_meta.clear(); + dbfull()->GetLiveFilesMetaData(&files_meta); + for (auto file : files_meta) { + files_after_compact.push_back(file.name); + } + + // Like nothing happened + ASSERT_EQ(files_before_compact, files_after_compact); + ASSERT_EQ(manual_compactions_paused, 1); + + manual_compactions_paused = 0; + // Now make sure CompactFiles also not run + dbfull()->CompactFiles(rocksdb::CompactionOptions(), + files_before_compact, 0); + // Wait for manual compaction to get scheduled and finish + dbfull()->TEST_WaitForCompact(true); + + files_meta.clear(); + files_after_compact.clear(); + dbfull()->GetLiveFilesMetaData(&files_meta); + for (auto file : files_meta) { + files_after_compact.push_back(file.name); + } + + ASSERT_EQ(files_before_compact, files_after_compact); + // CompactFiles returns at entry point + ASSERT_EQ(manual_compactions_paused, 0); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +// PausingManualCompaction does not affect auto compaction +TEST_F(DBTest2, PausingManualCompaction2) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 2; + options.disable_auto_compactions = false; + + DestroyAndReopen(options); + dbfull()->DisableManualCompaction(); + + Random rnd(301); + for (int i = 0; i < 2; i++) { + // Generate a file containing 10 keys. + for (int j = 0; j < 100; j++) { + ASSERT_OK(Put(Key(j), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); + + std::vector files_meta; + dbfull()->GetLiveFilesMetaData(&files_meta); + ASSERT_EQ(files_meta.size(), 1); +} + +TEST_F(DBTest2, PausingManualCompaction3) { + CompactRangeOptions compact_options; + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.num_levels = 7; + + Random rnd(301); + auto generate_files = [&]() { + for (int i = 0; i < options.num_levels; i++) { + for (int j = 0; j < options.num_levels-i+1; j++) { + for (int k = 0; k < 1000; k++) { + ASSERT_OK(Put(Key(k + j * 1000), RandomString(&rnd, 50))); + } + Flush(); + } + + for (int l = 1; l < options.num_levels-i; l++) { + MoveFilesToLevel(l); + } + } + }; + + DestroyAndReopen(options); + generate_files(); +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel()); +#endif // !ROCKSDB_LITE + int run_manual_compactions = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():PausingManualCompaction:1", [&](void* /*arg*/) { + run_manual_compactions++; + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + dbfull()->DisableManualCompaction(); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); + // As manual compaction disabled, not even reach sync point + ASSERT_EQ(run_manual_compactions, 0); +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + rocksdb::SyncPoint::GetInstance()->ClearCallBack( + "CompactionJob::Run():PausingManualCompaction:1"); + dbfull()->EnableManualCompaction(); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); +#ifndef ROCKSDB_LITE + ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBTest2, PausingManualCompaction4) { + CompactRangeOptions compact_options; + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.num_levels = 7; + + Random rnd(301); + auto generate_files = [&]() { + for (int i = 0; i < options.num_levels; i++) { + for (int j = 0; j < options.num_levels-i+1; j++) { + for (int k = 0; k < 1000; k++) { + ASSERT_OK(Put(Key(k + j * 1000), RandomString(&rnd, 50))); + } + Flush(); + } + + for (int l = 1; l < options.num_levels-i; l++) { + MoveFilesToLevel(l); + } + } + }; + + DestroyAndReopen(options); + generate_files(); +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel()); +#endif // !ROCKSDB_LITE + int run_manual_compactions = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():PausingManualCompaction:2", [&](void* arg) { + auto paused = reinterpret_cast*>(arg); + ASSERT_FALSE(paused->load(std::memory_order_acquire)); + paused->store(true, std::memory_order_release); + run_manual_compactions++; + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + dbfull()->EnableManualCompaction(); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); + ASSERT_EQ(run_manual_compactions, 1); +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + rocksdb::SyncPoint::GetInstance()->ClearCallBack( + "CompactionJob::Run():PausingManualCompaction:2"); + dbfull()->EnableManualCompaction(); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); +#ifndef ROCKSDB_LITE + ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DBTest2, OptimizeForPointLookup) { Options options = CurrentOptions(); Close(); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index a2961792b..ca8ab85b8 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -999,6 +999,9 @@ class DB { virtual Status EnableAutoCompaction( const std::vector& column_family_handles) = 0; + virtual void DisableManualCompaction() = 0; + virtual void EnableManualCompaction() = 0; + // Number of levels used for this DB. virtual int NumberLevels(ColumnFamilyHandle* column_family) = 0; virtual int NumberLevels() { return NumberLevels(DefaultColumnFamily()); } diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index e4360126d..507d04168 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -77,6 +77,7 @@ class Status { kSpaceLimit = 8, kPathNotFound = 9, KMergeOperandsInsufficientCapacity = 10, + kManualCompactionPaused = 11, kMaxSubCode }; @@ -295,6 +296,12 @@ class Status { return (code() == kIOError) && (subcode() == kPathNotFound); } + // Returns true iff the status indicates manual compaction paused. This + // is caused by a call to PauseManualCompaction + bool IsManualCompactionPaused() const { + return (code() == kIncomplete) && (subcode() == kManualCompactionPaused); + } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 04f1039c2..9dd038b84 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -271,6 +271,13 @@ class StackableDB : public DB { return db_->EnableAutoCompaction(column_family_handles); } + virtual void EnableManualCompaction() override { + return db_->EnableManualCompaction(); + } + virtual void DisableManualCompaction() override { + return db_->DisableManualCompaction(); + } + using DB::NumberLevels; virtual int NumberLevels(ColumnFamilyHandle* column_family) override { return db_->NumberLevels(column_family); diff --git a/util/status.cc b/util/status.cc index 940594480..0b1b24935 100644 --- a/util/status.cc +++ b/util/status.cc @@ -43,6 +43,10 @@ static const char* msgs[static_cast(Status::kMaxSubCode)] = { "Memory limit reached", // kMemoryLimit "Space limit reached", // kSpaceLimit "No such file or directory", // kPathNotFound + // KMergeOperandsInsufficientCapacity + "Insufficient capacity for merge operands", + // kManualCompactionPaused + "Manual compaction paused", }; Status::Status(Code _code, SubCode _subcode, const Slice& msg,