diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 135018f51..73a7c3974 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -38,7 +38,8 @@ CompactionIterator::CompactionIterator( const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, - SnapshotListFetchCallback* snap_list_callback) + SnapshotListFetchCallback* snap_list_callback, + const std::atomic* manual_compaction_paused) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, @@ -46,7 +47,8 @@ CompactionIterator::CompactionIterator( std::unique_ptr( compaction ? new CompactionProxy(compaction) : nullptr), compaction_filter, shutting_down, preserve_deletes_seqnum, - snap_list_callback) {} + snap_list_callback, + manual_compaction_paused) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -59,7 +61,8 @@ CompactionIterator::CompactionIterator( const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, - SnapshotListFetchCallback* snap_list_callback) + SnapshotListFetchCallback* snap_list_callback, + const std::atomic* manual_compaction_paused) : input_(input), cmp_(cmp), merge_helper_(merge_helper), @@ -73,6 +76,7 @@ CompactionIterator::CompactionIterator( compaction_(std::move(compaction)), compaction_filter_(compaction_filter), shutting_down_(shutting_down), + manual_compaction_paused_(manual_compaction_paused), preserve_deletes_seqnum_(preserve_deletes_seqnum), current_user_key_sequence_(0), current_user_key_snapshot_(0), @@ -234,7 +238,8 @@ void CompactionIterator::NextFromInput() { at_next_ = false; valid_ = false; - while (!valid_ && input_->Valid() && !IsShuttingDown()) { + while (!valid_ && input_->Valid() && !IsPausingManualCompaction() && + !IsShuttingDown()) { key_ = input_->key(); value_ = input_->value(); iter_stats_.num_input_records++; @@ -612,6 +617,10 @@ void CompactionIterator::NextFromInput() { if (!valid_ && IsShuttingDown()) { status_ = Status::ShutdownInProgress(); } + + if (IsPausingManualCompaction()) { + status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } } void CompactionIterator::PrepareOutput() { diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 2bf847e2e..c06373950 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -118,7 +118,8 @@ class CompactionIterator { const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, const SequenceNumber preserve_deletes_seqnum = 0, - SnapshotListFetchCallback* snap_list_callback = nullptr); + SnapshotListFetchCallback* snap_list_callback = nullptr, + const std::atomic* manual_compaction_paused = nullptr); // Constructor with custom CompactionProxy, used for tests. CompactionIterator(InternalIterator* input, const Comparator* cmp, @@ -132,7 +133,8 @@ class CompactionIterator { const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, const SequenceNumber preserve_deletes_seqnum = 0, - SnapshotListFetchCallback* snap_list_callback = nullptr); + SnapshotListFetchCallback* snap_list_callback = nullptr, + const std::atomic* manual_compaction_paused = nullptr); ~CompactionIterator(); @@ -213,6 +215,7 @@ class CompactionIterator { std::unique_ptr compaction_; const CompactionFilter* compaction_filter_; const std::atomic* shutting_down_; + const std::atomic* manual_compaction_paused_; const SequenceNumber preserve_deletes_seqnum_; bool bottommost_level_; bool valid_ = false; @@ -279,5 +282,11 @@ class CompactionIterator { // This is a best-effort facility, so memory_order_relaxed is sufficient. return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); } + + bool IsPausingManualCompaction() { + // This is a best-effort facility, so memory_order_relaxed is sufficient. + return manual_compaction_paused_ && + manual_compaction_paused_->load(std::memory_order_relaxed); + } }; } // namespace rocksdb diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 1acc48b4c..d49783144 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -311,7 +311,8 @@ CompactionJob::CompactionJob( const SnapshotChecker* snapshot_checker, std::shared_ptr table_cache, EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats, - Env::Priority thread_pri, SnapshotListFetchCallback* snap_list_callback) + Env::Priority thread_pri, SnapshotListFetchCallback* snap_list_callback, + const std::atomic* manual_compaction_paused) : job_id_(job_id), compact_(new CompactionState(compaction)), compaction_job_stats_(compaction_job_stats), @@ -324,6 +325,7 @@ CompactionJob::CompactionJob( env_->OptimizeForCompactionTableRead(env_options, db_options_)), versions_(versions), shutting_down_(shutting_down), + manual_compaction_paused_(manual_compaction_paused), preserve_deletes_seqnum_(preserve_deletes_seqnum), log_buffer_(log_buffer), db_directory_(db_directory), @@ -867,9 +869,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { false /* internal key corruption is expected */, existing_snapshots_.empty() ? 0 : existing_snapshots_.back(), snapshot_checker_, compact_->compaction->level(), - db_options_.statistics.get(), shutting_down_); + db_options_.statistics.get()); TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); + TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:1", + reinterpret_cast( + const_cast *>(manual_compaction_paused_))); Slice* start = sub_compact->start; Slice* end = sub_compact->end; @@ -889,7 +894,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { &range_del_agg, sub_compact->compaction, compaction_filter, shutting_down_, preserve_deletes_seqnum_, // Currently range_del_agg is incompatible with snapshot refresh feature. - range_del_agg.IsEmpty() ? snap_list_callback_ : nullptr)); + range_del_agg.IsEmpty() ? snap_list_callback_ : nullptr, + manual_compaction_paused_)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { @@ -953,7 +959,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { input_status = input->status(); output_file_ended = true; } + TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:2", + reinterpret_cast( + const_cast *>(manual_compaction_paused_))); c_iter->Next(); + if (c_iter->status().IsManualCompactionPaused()) { + break; + } if (!output_file_ended && c_iter->Valid() && sub_compact->compaction->output_level() != 0 && sub_compact->ShouldStopBefore(c_iter->key(), @@ -1006,6 +1018,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { shutting_down_->load(std::memory_order_relaxed)) { status = Status::ShutdownInProgress("Database shutdown"); } + if ((status.ok() || status.IsColumnFamilyDropped()) && + (manual_compaction_paused_ && + manual_compaction_paused_->load(std::memory_order_relaxed))) { + status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } if (status.ok()) { status = input->status(); } diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 84d38c163..79069770c 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -75,7 +75,8 @@ class CompactionJob { std::shared_ptr table_cache, EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats, - Env::Priority thread_pri, SnapshotListFetchCallback* snap_list_callback); + Env::Priority thread_pri, SnapshotListFetchCallback* snap_list_callback, + const std::atomic* manual_compaction_paused = nullptr); ~CompactionJob(); @@ -154,6 +155,7 @@ class CompactionJob { EnvOptions env_options_for_read_; VersionSet* versions_; const std::atomic* shutting_down_; + const std::atomic* manual_compaction_paused_; const SequenceNumber preserve_deletes_seqnum_; LogBuffer* log_buffer_; Directory* db_directory_; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 443138908..710fde5b4 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -165,6 +165,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, batch_per_txn_(batch_per_txn), db_lock_(nullptr), shutting_down_(false), + manual_compaction_paused_(false), bg_cv_(&mutex_), logfile_number_(0), log_dir_synced_(false), diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 3e45442ca..3552a7cf2 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -282,6 +282,9 @@ class DBImpl : public DB { virtual Status EnableAutoCompaction( const std::vector& column_family_handles) override; + virtual void EnableManualCompaction() override; + virtual void DisableManualCompaction() override; + using DB::SetOptions; Status SetOptions( ColumnFamilyHandle* column_family, @@ -1638,6 +1641,7 @@ class DBImpl : public DB { InstrumentedMutex log_write_mutex_; std::atomic shutting_down_; + std::atomic manual_compaction_paused_; // This condition variable is signaled on these conditions: // * whenever bg_compaction_scheduled_ goes down to 0 // * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 1b862e033..724d54f09 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -917,6 +917,9 @@ Status DBImpl::CompactFilesImpl( if (shutting_down_.load(std::memory_order_acquire)) { return Status::ShutdownInProgress(); } + if (manual_compaction_paused_.load(std::memory_order_acquire)) { + return Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } std::unordered_set input_set; for (const auto& file_name : input_file_names) { @@ -1012,7 +1015,8 @@ Status DBImpl::CompactFilesImpl( immutable_db_options_.max_subcompactions <= 1 && c->mutable_cf_options()->snap_refresh_nanos > 0 ? &fetch_callback - : nullptr); + : nullptr, + &manual_compaction_paused_); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -1058,6 +1062,12 @@ Status DBImpl::CompactFilesImpl( // Done } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) { // Ignore compaction errors found during shutting down + } else if (status.IsManualCompactionPaused()) { + // Don't report stopping manual compaction as error + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "[%s] [JOB %d] Stopping manual compaction", + c->column_family_data()->GetName().c_str(), + job_context->job_id); } else { ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] [JOB %d] Compaction error: %s", @@ -1128,6 +1138,10 @@ void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, if (shutting_down_.load(std::memory_order_acquire)) { return; } + if (c->is_manual_compaction() && + manual_compaction_paused_.load(std::memory_order_acquire)) { + return; + } Version* current = cfd->current(); current->Ref(); // release lock while notifying events @@ -1190,6 +1204,10 @@ void DBImpl::NotifyOnCompactionCompleted( if (shutting_down_.load(std::memory_order_acquire)) { return; } + if (c->is_manual_compaction() && + manual_compaction_paused_.load(std::memory_order_acquire)) { + return; + } Version* current = cfd->current(); current->Ref(); // release lock while notifying events @@ -1879,6 +1897,14 @@ Status DBImpl::EnableAutoCompaction( return s; } +void DBImpl::DisableManualCompaction() { + manual_compaction_paused_.store(true, std::memory_order_release); +} + +void DBImpl::EnableManualCompaction() { + manual_compaction_paused_.store(false, std::memory_order_release); +} + void DBImpl::MaybeScheduleFlushOrCompaction() { mutex_.AssertHeld(); if (!opened_successfully_) { @@ -2319,6 +2345,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, env_->SleepForMicroseconds(10000); // prevent hot loop mutex_.Lock(); } else if (!s.ok() && !s.IsShutdownInProgress() && + !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to @@ -2336,6 +2363,12 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, LogFlush(immutable_db_options_.info_log); env_->SleepForMicroseconds(1000000); mutex_.Lock(); + } else if (s.IsManualCompactionPaused()) { + ManualCompactionState *m = prepicked_compaction->manual_compaction_state; + assert(m); + ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused", + m->cfd->GetName().c_str(), + job_context.job_id); } ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); @@ -2344,7 +2377,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, // have created (they might not be all recorded in job_context in case of a // failure). Thus, we force full scan in FindObsoleteFiles() FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() && - !s.IsColumnFamilyDropped()); + !s.IsManualCompactionPaused() && + !s.IsColumnFamilyDropped()); TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"); // delete unnecessary files if any, this is done outside the mutex @@ -2427,6 +2461,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (!error_handler_.IsBGWorkStopped()) { if (shutting_down_.load(std::memory_order_acquire)) { status = Status::ShutdownInProgress(); + } else if (is_manual && + manual_compaction_paused_.load(std::memory_order_acquire)) { + status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); } } else { status = error_handler_.GetBGError(); @@ -2744,7 +2781,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, immutable_db_options_.max_subcompactions <= 1 && c->mutable_cf_options()->snap_refresh_nanos > 0 ? &fetch_callback - : nullptr); + : nullptr, is_manual ? &manual_compaction_paused_ : nullptr); compaction_job.Prepare(); NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, @@ -2784,7 +2821,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, compaction_job_stats, job_context->job_id); } - if (status.ok() || status.IsCompactionTooLarge()) { + if (status.ok() || status.IsCompactionTooLarge() || + status.IsManualCompactionPaused()) { // Done } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) { // Ignore compaction errors found during shutting down diff --git a/db/db_test.cc b/db/db_test.cc index 60a077f57..9ca550b8d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2737,6 +2737,10 @@ class ModelDB : public DB { return Status::NotSupported("Not supported operation."); } + void EnableManualCompaction() override { return; } + + void DisableManualCompaction() override { return; } + using DB::NumberLevels; int NumberLevels(ColumnFamilyHandle* /*column_family*/) override { return 1; } diff --git a/db/db_test2.cc b/db/db_test2.cc index cf622973a..a4c5b5aa3 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2402,6 +2402,217 @@ TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(DBTest2, PausingManualCompaction1) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.num_levels = 7; + + DestroyAndReopen(options); + Random rnd(301); + // Generate a file containing 10 keys. + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + + // Generate another file containing same keys + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + + int manual_compactions_paused = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():PausingManualCompaction:1", [&](void* arg) { + auto paused = reinterpret_cast*>(arg); + ASSERT_FALSE(paused->load(std::memory_order_acquire)); + paused->store(true, std::memory_order_release); + manual_compactions_paused += 1; + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector files_before_compact, files_after_compact; + // Remember file name before compaction is triggered + std::vector files_meta; + dbfull()->GetLiveFilesMetaData(&files_meta); + for (auto file : files_meta) { + files_before_compact.push_back(file.name); + } + + // OK, now trigger a manual compaction + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); + + // Wait for compactions to get scheduled and stopped + dbfull()->TEST_WaitForCompact(true); + + // Get file names after compaction is stopped + files_meta.clear(); + dbfull()->GetLiveFilesMetaData(&files_meta); + for (auto file : files_meta) { + files_after_compact.push_back(file.name); + } + + // Like nothing happened + ASSERT_EQ(files_before_compact, files_after_compact); + ASSERT_EQ(manual_compactions_paused, 1); + + manual_compactions_paused = 0; + // Now make sure CompactFiles also not run + dbfull()->CompactFiles(rocksdb::CompactionOptions(), + files_before_compact, 0); + // Wait for manual compaction to get scheduled and finish + dbfull()->TEST_WaitForCompact(true); + + files_meta.clear(); + files_after_compact.clear(); + dbfull()->GetLiveFilesMetaData(&files_meta); + for (auto file : files_meta) { + files_after_compact.push_back(file.name); + } + + ASSERT_EQ(files_before_compact, files_after_compact); + // CompactFiles returns at entry point + ASSERT_EQ(manual_compactions_paused, 0); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +// PausingManualCompaction does not affect auto compaction +TEST_F(DBTest2, PausingManualCompaction2) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 2; + options.disable_auto_compactions = false; + + DestroyAndReopen(options); + dbfull()->DisableManualCompaction(); + + Random rnd(301); + for (int i = 0; i < 2; i++) { + // Generate a file containing 10 keys. + for (int j = 0; j < 100; j++) { + ASSERT_OK(Put(Key(j), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); + + std::vector files_meta; + dbfull()->GetLiveFilesMetaData(&files_meta); + ASSERT_EQ(files_meta.size(), 1); +} + +TEST_F(DBTest2, PausingManualCompaction3) { + CompactRangeOptions compact_options; + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.num_levels = 7; + + Random rnd(301); + auto generate_files = [&]() { + for (int i = 0; i < options.num_levels; i++) { + for (int j = 0; j < options.num_levels-i+1; j++) { + for (int k = 0; k < 1000; k++) { + ASSERT_OK(Put(Key(k + j * 1000), RandomString(&rnd, 50))); + } + Flush(); + } + + for (int l = 1; l < options.num_levels-i; l++) { + MoveFilesToLevel(l); + } + } + }; + + DestroyAndReopen(options); + generate_files(); +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel()); +#endif // !ROCKSDB_LITE + int run_manual_compactions = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():PausingManualCompaction:1", [&](void* /*arg*/) { + run_manual_compactions++; + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + dbfull()->DisableManualCompaction(); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); + // As manual compaction disabled, not even reach sync point + ASSERT_EQ(run_manual_compactions, 0); +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + rocksdb::SyncPoint::GetInstance()->ClearCallBack( + "CompactionJob::Run():PausingManualCompaction:1"); + dbfull()->EnableManualCompaction(); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); +#ifndef ROCKSDB_LITE + ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBTest2, PausingManualCompaction4) { + CompactRangeOptions compact_options; + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.num_levels = 7; + + Random rnd(301); + auto generate_files = [&]() { + for (int i = 0; i < options.num_levels; i++) { + for (int j = 0; j < options.num_levels-i+1; j++) { + for (int k = 0; k < 1000; k++) { + ASSERT_OK(Put(Key(k + j * 1000), RandomString(&rnd, 50))); + } + Flush(); + } + + for (int l = 1; l < options.num_levels-i; l++) { + MoveFilesToLevel(l); + } + } + }; + + DestroyAndReopen(options); + generate_files(); +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel()); +#endif // !ROCKSDB_LITE + int run_manual_compactions = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():PausingManualCompaction:2", [&](void* arg) { + auto paused = reinterpret_cast*>(arg); + ASSERT_FALSE(paused->load(std::memory_order_acquire)); + paused->store(true, std::memory_order_release); + run_manual_compactions++; + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + dbfull()->EnableManualCompaction(); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); + ASSERT_EQ(run_manual_compactions, 1); +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + rocksdb::SyncPoint::GetInstance()->ClearCallBack( + "CompactionJob::Run():PausingManualCompaction:2"); + dbfull()->EnableManualCompaction(); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); +#ifndef ROCKSDB_LITE + ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DBTest2, OptimizeForPointLookup) { Options options = CurrentOptions(); Close(); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index a2961792b..ca8ab85b8 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -999,6 +999,9 @@ class DB { virtual Status EnableAutoCompaction( const std::vector& column_family_handles) = 0; + virtual void DisableManualCompaction() = 0; + virtual void EnableManualCompaction() = 0; + // Number of levels used for this DB. virtual int NumberLevels(ColumnFamilyHandle* column_family) = 0; virtual int NumberLevels() { return NumberLevels(DefaultColumnFamily()); } diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index e4360126d..507d04168 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -77,6 +77,7 @@ class Status { kSpaceLimit = 8, kPathNotFound = 9, KMergeOperandsInsufficientCapacity = 10, + kManualCompactionPaused = 11, kMaxSubCode }; @@ -295,6 +296,12 @@ class Status { return (code() == kIOError) && (subcode() == kPathNotFound); } + // Returns true iff the status indicates manual compaction paused. This + // is caused by a call to PauseManualCompaction + bool IsManualCompactionPaused() const { + return (code() == kIncomplete) && (subcode() == kManualCompactionPaused); + } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 04f1039c2..9dd038b84 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -271,6 +271,13 @@ class StackableDB : public DB { return db_->EnableAutoCompaction(column_family_handles); } + virtual void EnableManualCompaction() override { + return db_->EnableManualCompaction(); + } + virtual void DisableManualCompaction() override { + return db_->DisableManualCompaction(); + } + using DB::NumberLevels; virtual int NumberLevels(ColumnFamilyHandle* column_family) override { return db_->NumberLevels(column_family); diff --git a/util/status.cc b/util/status.cc index 940594480..0b1b24935 100644 --- a/util/status.cc +++ b/util/status.cc @@ -43,6 +43,10 @@ static const char* msgs[static_cast(Status::kMaxSubCode)] = { "Memory limit reached", // kMemoryLimit "Space limit reached", // kSpaceLimit "No such file or directory", // kPathNotFound + // KMergeOperandsInsufficientCapacity + "Insufficient capacity for merge operands", + // kManualCompactionPaused + "Manual compaction paused", }; Status::Status(Code _code, SubCode _subcode, const Slice& msg,