diff --git a/HISTORY.md b/HISTORY.md index 894c950f1..ca6d0c42b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -20,6 +20,7 @@ * EXPERIMENTAL: Add new API `DB::ClipColumnFamily` to clip the key in CF to a certain range. It will physically deletes all keys outside the range including tombstones. * Add `MakeSharedCache()` construction functions to various cache Options objects, and deprecated the `NewWhateverCache()` functions with long parameter lists. * Changed the meaning of various Bloom filter stats (prefix vs. whole key), with iterator-related filtering only being tracked in the new \*`_LEVEL_SEEK_`\*. stats. (#11460) +* Add `WaitForCompact()` to wait for all flush and compactions jobs to finish. Jobs to wait include the unscheduled (queued, but not scheduled yet). ### Behavior changes * For x86, CPU features are no longer detected at runtime nor in build scripts, but in source code using common preprocessor defines. This will likely unlock some small performance improvements on some newer hardware, but could hurt performance of the kCRC32c checksum, which is no longer the default, on some "portable" builds. See PR #11419 for details. diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index ed152f28c..b16ca2ca6 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -808,8 +808,8 @@ Status CompactionJob::Run() { RecordCompactionIOStats(); LogFlush(db_options_.info_log); TEST_SYNC_POINT("CompactionJob::Run():End"); - compact_->status = status; + TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():EndStatusSet", &status); return status; } diff --git a/db/compaction/tiered_compaction_test.cc b/db/compaction/tiered_compaction_test.cc index 8f274c2a6..d8aa229df 100644 --- a/db/compaction/tiered_compaction_test.cc +++ b/db/compaction/tiered_compaction_test.cc @@ -209,7 +209,7 @@ TEST_P(TieredCompactionTest, SequenceBasedTieredStorageUniversal) { seq_history.emplace_back(dbfull()->GetLatestSequenceNumber()); expect_stats[0].Add(kBasicFlushStats); } - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); // the penultimate level file temperature is not cold, all data are output to // the penultimate level. @@ -374,7 +374,7 @@ TEST_P(TieredCompactionTest, RangeBasedTieredStorageUniversal) { ASSERT_OK(Flush()); expect_stats[0].Add(kBasicFlushStats); } - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); @@ -446,7 +446,7 @@ TEST_P(TieredCompactionTest, RangeBasedTieredStorageUniversal) { ASSERT_OK(Flush()); } // make sure the compaction is able to finish - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); @@ -911,7 +911,7 @@ TEST_P(TieredCompactionTest, SequenceBasedTieredStorageLevel) { ASSERT_OK(Flush()); expect_stats[0].Add(kBasicFlushStats); } - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); // non last level is hot ASSERT_EQ("0,1", FilesPerLevel()); @@ -954,7 +954,7 @@ TEST_P(TieredCompactionTest, SequenceBasedTieredStorageLevel) { ASSERT_OK(Flush()); seq_history.emplace_back(dbfull()->GetLatestSequenceNumber()); } - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel()); ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); @@ -1005,7 +1005,7 @@ TEST_P(TieredCompactionTest, SequenceBasedTieredStorageLevel) { ASSERT_OK(Flush()); seq_history.emplace_back(dbfull()->GetLatestSequenceNumber()); } - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); latest_cold_seq = seq_history[0]; ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); @@ -1135,7 +1135,7 @@ TEST_P(TieredCompactionTest, RangeBasedTieredStorageLevel) { } ASSERT_OK(Flush()); } - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); @@ -1265,7 +1265,7 @@ TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeManualCompaction) { } ASSERT_OK(Flush()); } - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); // all data is pushed to the last level ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); @@ -1327,7 +1327,7 @@ TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeAutoCompaction) { } ASSERT_OK(Flush()); } - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); // all data is pushed to the last level ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); @@ -1360,7 +1360,7 @@ TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeAutoCompaction) { }); } ASSERT_OK(Flush()); - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); } // all data is moved up to the penultimate level @@ -1403,7 +1403,7 @@ TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimePartial) { } ASSERT_OK(Flush()); } - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); // all data is pushed to the last level ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); @@ -1530,7 +1530,7 @@ TEST_F(PrecludeLastLevelTest, LastLevelOnlyCompactionPartial) { } ASSERT_OK(Flush()); } - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); // all data is pushed to the last level ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); @@ -1609,7 +1609,7 @@ TEST_P(PrecludeLastLevelTestWithParms, LastLevelOnlyCompactionNoPreclude) { } ASSERT_OK(Flush()); } - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); // all data is pushed to the last level ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); @@ -1705,7 +1705,7 @@ TEST_P(PrecludeLastLevelTestWithParms, LastLevelOnlyCompactionNoPreclude) { manual_compaction_thread.join(); - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); if (enable_preclude_last_level) { ASSERT_NE("0,0,0,0,0,1,1", FilesPerLevel()); @@ -1841,7 +1841,7 @@ TEST_P(PrecludeLastLevelTestWithParms, PeriodicCompactionToPenultimateLevel) { } ASSERT_OK(Flush()); - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); stop_token.reset(); @@ -1940,7 +1940,7 @@ TEST_F(PrecludeLastLevelTest, PartialPenultimateLevelCompaction) { ASSERT_OK(Flush()); } - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); // L5: [0,19] [20,39] [40,299] // L6: [0, 299] @@ -2106,7 +2106,7 @@ TEST_F(PrecludeLastLevelTest, RangeDelsCauseFileEndpointsToOverlap) { Slice begin_key(begin_key_buf), end_key(end_key_buf); ASSERT_OK(db_->SuggestCompactRange(db_->DefaultColumnFamily(), &begin_key, &end_key)); - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_EQ("0,0,0,0,0,3,3", FilesPerLevel()); ASSERT_EQ(1, per_key_comp_num); verify_db(); @@ -2116,7 +2116,7 @@ TEST_F(PrecludeLastLevelTest, RangeDelsCauseFileEndpointsToOverlap) { db_->ReleaseSnapshot(snap2); ASSERT_OK(db_->SuggestCompactRange(db_->DefaultColumnFamily(), &begin_key, &end_key)); - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_EQ("0,0,0,0,0,3,3", FilesPerLevel()); ASSERT_EQ(2, per_key_comp_num); verify_db(); @@ -2126,7 +2126,7 @@ TEST_F(PrecludeLastLevelTest, RangeDelsCauseFileEndpointsToOverlap) { db_->ReleaseSnapshot(snap1); ASSERT_OK(db_->SuggestCompactRange(db_->DefaultColumnFamily(), &begin_key, &end_key)); - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_EQ("0,0,0,0,0,2,3", FilesPerLevel()); ASSERT_EQ(3, per_key_comp_num); verify_db(); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 888eac4a8..111964122 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -3285,6 +3285,209 @@ TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) { TEST_SYNC_POINT( "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2"); ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBCompactionTest, WaitForCompactWaitsOnCompactionToFinish) { + // This test creates a scenario to trigger compaction by number of L0 files + // Before the compaction finishes, it closes the DB + // Upon reopen, wait for the compaction to finish and checks for the number of + // compaction finished + + const int kNumKeysPerFile = 4; + const int kNumFiles = 2; + + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = kNumFiles + 1; + + DestroyAndReopen(options); + + // create the scenario where one more L0 file will trigger compaction + Random rnd(301); + for (int i = 0; i < kNumFiles; ++i) { + for (int j = 0; j < kNumKeysPerFile; ++j) { + ASSERT_OK( + Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions())); + ASSERT_EQ("2", FilesPerLevel()); + + int compaction_finished = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():EndStatusSet", [&](void* arg) { + auto status = static_cast(arg); + if (status->ok()) { + compaction_finished++; + } + }); + // To make sure there's a flush/compaction debt + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MaybeScheduleFlushOrCompaction:BeforeSchedule", [&](void* arg) { + auto unscheduled_flushes = *static_cast(arg); + ASSERT_GT(unscheduled_flushes, 0); + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBCompactionTest::WaitForCompactWaitsOnCompactionToFinish", + "DBImpl::MaybeScheduleFlushOrCompaction:BeforeSchedule"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // create compaction debt by adding one more L0 file then closing + GenerateNewRandomFile(&rnd, /* nowait */ true); + ASSERT_EQ(0, compaction_finished); + Close(); + TEST_SYNC_POINT("DBCompactionTest::WaitForCompactWaitsOnCompactionToFinish"); + ASSERT_EQ(0, compaction_finished); + + // Reopen the db and we expect the compaction to be triggered. + Reopen(options); + + // Wait for compaction to finish + ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions())); + ASSERT_GT(compaction_finished, 0); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(DBCompactionTest, WaitForCompactAbortOnPauseAborted) { + // This test creates a scenario to trigger compaction by number of L0 files + // Before the compaction finishes, it pauses the compaction + // Calling WaitForCompact with option abort_on_pause=true should return + // Status::Aborted + + const int kNumKeysPerFile = 4; + const int kNumFiles = 2; + + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = kNumFiles + 1; + + DestroyAndReopen(options); + + // create the scenario where one more L0 file will trigger compaction + Random rnd(301); + for (int i = 0; i < kNumFiles; ++i) { + for (int j = 0; j < kNumKeysPerFile; ++j) { + ASSERT_OK( + Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions())); + + // Now trigger L0 compaction by adding a file + GenerateNewRandomFile(&rnd, /* nowait */ true); + ASSERT_OK(Flush()); + + // Pause the background jobs. + ASSERT_OK(dbfull()->PauseBackgroundWork()); + + WaitForCompactOptions waitForCompactOptions = WaitForCompactOptions(); + waitForCompactOptions.abort_on_pause = true; + Status s = dbfull()->WaitForCompact(waitForCompactOptions); + ASSERT_NOK(s); + ASSERT_TRUE(s.IsAborted()); +} + +TEST_F(DBCompactionTest, WaitForCompactContinueAfterPauseNotAborted) { + // This test creates a scenario to trigger compaction by number of L0 files + // Before the compaction finishes, it pauses the compaction + // Calling WaitForCompact with option abort_on_pause=false will not wait + // indefinitely upon ContinueBackgroundWork() + + const int kNumKeysPerFile = 4; + const int kNumFiles = 2; + + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = kNumFiles + 1; + + DestroyAndReopen(options); + + // create the scenario where one more L0 file will trigger compaction + Random rnd(301); + for (int i = 0; i < kNumFiles; ++i) { + for (int j = 0; j < kNumKeysPerFile; ++j) { + ASSERT_OK( + Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions())); + + // Now trigger L0 compaction by adding a file + GenerateNewRandomFile(&rnd, /* nowait */ true); + ASSERT_OK(Flush()); + + // Pause the background jobs. + ASSERT_OK(dbfull()->PauseBackgroundWork()); + + // Continue the background jobs. + ASSERT_OK(dbfull()->ContinueBackgroundWork()); + + WaitForCompactOptions waitForCompactOptions = WaitForCompactOptions(); + waitForCompactOptions.abort_on_pause = true; + ASSERT_OK(dbfull()->WaitForCompact(waitForCompactOptions)); +} + +TEST_F(DBCompactionTest, WaitForCompactShutdownWhileWaiting) { + // This test creates a scenario to trigger compaction by number of L0 files + // Before the compaction finishes, db shuts down (by calling + // CancelAllBackgroundWork()) Calling WaitForCompact should return + // Status::IsShutdownInProgress() + + const int kNumKeysPerFile = 4; + const int kNumFiles = 2; + + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = kNumFiles + 1; + + DestroyAndReopen(options); + + // create the scenario where one more L0 file will trigger compaction + Random rnd(301); + for (int i = 0; i < kNumFiles; ++i) { + for (int j = 0; j < kNumKeysPerFile; ++j) { + ASSERT_OK( + Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions())); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + {"CompactionJob::Run():Start", + "DBCompactionTest::WaitForCompactShutdownWhileWaiting:0"}, + {"DBImpl::WaitForCompact:Start", + "DBCompactionTest::WaitForCompactShutdownWhileWaiting:1"}, + {"DBImpl::~DBImpl:WaitJob", "CompactionJob::Run():End"}, + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Now trigger L0 compaction by adding a file + GenerateNewRandomFile(&rnd, /* nowait */ true); + ASSERT_OK(Flush()); + // Wait for compaction to start + TEST_SYNC_POINT("DBCompactionTest::WaitForCompactShutdownWhileWaiting:0"); + + // Wait for Compaction in another thread + auto waiting_for_compaction_thread = port::Thread([this]() { + Status s = dbfull()->WaitForCompact(WaitForCompactOptions()); + ASSERT_NOK(s); + ASSERT_TRUE(s.IsShutdownInProgress()); + }); + TEST_SYNC_POINT("DBCompactionTest::WaitForCompactShutdownWhileWaiting:1"); + // Shutdown after wait started, but before the compaction finishes + auto closing_thread = port::Thread([this]() { Close(); }); + + waiting_for_compaction_thread.join(); + closing_thread.join(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } static std::string ShortKey(int i) { @@ -5822,7 +6025,7 @@ TEST_P(RoundRobinSubcompactionsAgainstPressureToken, PressureTokenTest) { } TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstPressureToken:2"); - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_TRUE(num_planned_subcompactions_verified); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); @@ -5897,7 +6100,7 @@ TEST_P(RoundRobinSubcompactionsAgainstResources, SubcompactionsUsingResources) { "CompactionJob::ReleaseSubcompactionResources:1"}}); SyncPoint::GetInstance()->EnableProcessing(); - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()})); TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:0"); TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:1"); @@ -5913,7 +6116,7 @@ TEST_P(RoundRobinSubcompactionsAgainstResources, SubcompactionsUsingResources) { total_low_pri_threads_ - 1, env_->ReleaseThreads(total_low_pri_threads_ - 1, Env::Priority::LOW)); TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:4"); - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_TRUE(num_planned_subcompactions_verified); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); @@ -5985,11 +6188,11 @@ TEST_P(DBCompactionTestWithParam, RoundRobinWithoutAdditionalResources) { "BackgroundCallCompaction:0"}}); SyncPoint::GetInstance()->EnableProcessing(); - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()})); TEST_SYNC_POINT("DBCompactionTest::RoundRobinWithoutAdditionalResources:0"); - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_TRUE(num_planned_subcompactions_verified); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index f2da7467d..f050863b8 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1058,16 +1058,8 @@ class DBImpl : public DB { VersionSet* GetVersionSet() const { return versions_.get(); } - // Wait for all flush and compactions jobs to finish. Jobs to wait include the - // unscheduled (queued, but not scheduled yet). If the db is shutting down, - // Status::ShutdownInProgress will be returned. If PauseBackgroundWork() was - // called prior to this, this may potentially wait for unscheduled jobs - // indefinitely. abort_on_pause can be set to true to abort, and - // Status::Aborted will be returned immediately. This may also never return if - // there's sufficient ongoing writes that keeps flush and compaction going - // without stopping. The user would have to cease all the writes to DB to make - // this eventually return in a stable state. - Status WaitForCompact(bool abort_on_pause = false); + Status WaitForCompact( + const WaitForCompactOptions& wait_for_compact_options) override; #ifndef NDEBUG // Compact any files in the named level that overlap [*begin, *end] @@ -1106,8 +1098,9 @@ class DBImpl : public DB { // Wait for memtable compaction Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr); - // Wait for any compaction - Status TEST_WaitForCompact(bool abort_on_pause = false); + Status TEST_WaitForCompact(); + Status TEST_WaitForCompact( + const WaitForCompactOptions& wait_for_compact_options); // Wait for any background purge Status TEST_WaitForPurge(); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 9e084ab62..2c9b5f5df 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2612,6 +2612,9 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { env_->GetBackgroundThreads(Env::Priority::HIGH) == 0; while (!is_flush_pool_empty && unscheduled_flushes_ > 0 && bg_flush_scheduled_ < bg_job_limits.max_flushes) { + TEST_SYNC_POINT_CALLBACK( + "DBImpl::MaybeScheduleFlushOrCompaction:BeforeSchedule", + &unscheduled_flushes_); bg_flush_scheduled_++; FlushThreadArg* fta = new FlushThreadArg; fta->db_ = this; @@ -3957,13 +3960,15 @@ void DBImpl::GetSnapshotContext( *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot); } -Status DBImpl::WaitForCompact(bool abort_on_pause) { +Status DBImpl::WaitForCompact( + const WaitForCompactOptions& wait_for_compact_options) { InstrumentedMutexLock l(&mutex_); + TEST_SYNC_POINT("DBImpl::WaitForCompact:Start"); for (;;) { if (shutting_down_.load(std::memory_order_acquire)) { return Status::ShutdownInProgress(); } - if (bg_work_paused_ && abort_on_pause) { + if (bg_work_paused_ && wait_for_compact_options.abort_on_pause) { return Status::Aborted(); } if ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index 9856caced..be63637a2 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -178,9 +178,12 @@ Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) { return WaitForFlushMemTable(cfd, nullptr, false); } -Status DBImpl::TEST_WaitForCompact(bool abort_on_pause) { - // Wait until the compaction completes - return WaitForCompact(abort_on_pause); +Status DBImpl::TEST_WaitForCompact() { + return WaitForCompact(WaitForCompactOptions()); +} +Status DBImpl::TEST_WaitForCompact( + const WaitForCompactOptions& wait_for_compact_options) { + return WaitForCompact(wait_for_compact_options); } Status DBImpl::TEST_WaitForPurge() { diff --git a/db/db_test.cc b/db/db_test.cc index cb0a41906..6327396d7 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3264,6 +3264,11 @@ class ModelDB : public DB { void DisableManualCompaction() override { return; } + virtual Status WaitForCompact( + const WaitForCompactOptions& /* wait_for_compact_options */) override { + return Status::OK(); + } + using DB::NumberLevels; int NumberLevels(ColumnFamilyHandle* /*column_family*/) override { return 1; } diff --git a/db/seqno_time_test.cc b/db/seqno_time_test.cc index ec13167a0..5914f8607 100644 --- a/db/seqno_time_test.cc +++ b/db/seqno_time_test.cc @@ -93,7 +93,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { } ASSERT_OK(Flush()); } - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); // All data is hot, only output to penultimate level ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); @@ -114,7 +114,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { }); } ASSERT_OK(Flush()); - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); } @@ -128,7 +128,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { }); } ASSERT_OK(Flush()); - ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); } CompactRangeOptions cro; diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 432584e68..0b239b0f5 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -2670,8 +2670,7 @@ void StressTest::Open(SharedState* shared) { // Ingested errors might happen in background compactions. We // wait for all compactions to finish to make sure DB is in // clean state before executing queries. - s = static_cast_with_check(db_->GetRootDB()) - ->WaitForCompact(); + s = db_->GetRootDB()->WaitForCompact(WaitForCompactOptions()); if (!s.ok()) { for (auto cf : column_families_) { delete cf; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index a42f5b8b6..f747f7c3d 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -53,6 +53,7 @@ struct Options; struct ReadOptions; struct TableProperties; struct WriteOptions; +struct WaitForCompactOptions; class Env; class EventListener; class FileSystem; @@ -1447,6 +1448,17 @@ class DB { // DisableManualCompaction() has been called. virtual void EnableManualCompaction() = 0; + // Wait for all flush and compactions jobs to finish. Jobs to wait include the + // unscheduled (queued, but not scheduled yet). If the db is shutting down, + // Status::ShutdownInProgress will be returned. + // + // NOTE: This may also never return if there's sufficient ongoing writes that + // keeps flush and compaction going without stopping. The user would have to + // cease all the writes to DB to make this eventually return in a stable + // state. + virtual Status WaitForCompact( + const WaitForCompactOptions& /* wait_for_compact_options */) = 0; + // Number of levels used for this DB. virtual int NumberLevels(ColumnFamilyHandle* column_family) = 0; virtual int NumberLevels() { return NumberLevels(DefaultColumnFamily()); } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index e9a708aa3..5a82065de 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -2080,4 +2080,13 @@ struct LiveFilesStorageInfoOptions { uint64_t wal_size_for_flush = 0; }; +struct WaitForCompactOptions { + // A boolean to abort waiting in case of a pause (PauseBackgroundWork() + // called) If true, Status::Aborted will be returned immediately. If false, + // ContinueBackgroundWork() must be called to resume the background jobs. + // Otherwise, jobs that were queued, but not scheduled yet may never finish + // and WaitForCompact() may wait indefinitely. + bool abort_on_pause = false; +}; + } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 1a87a1136..6b8d4fe26 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -347,6 +347,11 @@ class StackableDB : public DB { return db_->DisableManualCompaction(); } + virtual Status WaitForCompact( + const WaitForCompactOptions& wait_for_compact_options) override { + return db_->WaitForCompact(wait_for_compact_options); + } + using DB::NumberLevels; virtual int NumberLevels(ColumnFamilyHandle* column_family) override { return db_->NumberLevels(column_family); diff --git a/microbench/db_basic_bench.cc b/microbench/db_basic_bench.cc index 31df024b9..6fefddb38 100644 --- a/microbench/db_basic_bench.cc +++ b/microbench/db_basic_bench.cc @@ -303,7 +303,7 @@ static void DBPut(benchmark::State& state) { if (state.thread_index() == 0) { auto db_full = static_cast_with_check(db.get()); - Status s = db_full->WaitForCompact(); + Status s = db_full->WaitForCompact(WaitForCompactOptions()); if (!s.ok()) { state.SkipWithError(s.ToString().c_str()); return; @@ -410,7 +410,7 @@ static void ManualCompaction(benchmark::State& state) { if (state.thread_index() == 0) { auto db_full = static_cast_with_check(db.get()); - s = db_full->WaitForCompact(); + s = db_full->WaitForCompact(WaitForCompactOptions()); if (!s.ok()) { state.SkipWithError(s.ToString().c_str()); return; @@ -510,7 +510,7 @@ static void ManualFlush(benchmark::State& state) { if (state.thread_index() == 0) { auto db_full = static_cast_with_check(db.get()); - Status s = db_full->WaitForCompact(); + Status s = db_full->WaitForCompact(WaitForCompactOptions()); if (!s.ok()) { state.SkipWithError(s.ToString().c_str()); return; @@ -594,7 +594,7 @@ static void DBGet(benchmark::State& state) { } auto db_full = static_cast_with_check(db.get()); - s = db_full->WaitForCompact(); + s = db_full->WaitForCompact(WaitForCompactOptions()); if (!s.ok()) { state.SkipWithError(s.ToString().c_str()); return; @@ -707,7 +707,7 @@ static void SimpleGetWithPerfContext(benchmark::State& state) { } } auto db_full = static_cast_with_check(db.get()); - s = db_full->WaitForCompact(); + s = db_full->WaitForCompact(WaitForCompactOptions()); if (!s.ok()) { state.SkipWithError(s.ToString().c_str()); return; @@ -1115,7 +1115,7 @@ static void IteratorSeek(benchmark::State& state) { } auto db_full = static_cast_with_check(db.get()); - s = db_full->WaitForCompact(); + s = db_full->WaitForCompact(WaitForCompactOptions()); if (!s.ok()) { state.SkipWithError(s.ToString().c_str()); return; @@ -1206,7 +1206,7 @@ static void IteratorNext(benchmark::State& state) { } auto db_full = static_cast_with_check(db.get()); - s = db_full->WaitForCompact(); + s = db_full->WaitForCompact(WaitForCompactOptions()); if (!s.ok()) { state.SkipWithError(s.ToString().c_str()); return; @@ -1270,7 +1270,7 @@ static void IteratorNextWithPerfContext(benchmark::State& state) { } } auto db_full = static_cast_with_check(db.get()); - Status s = db_full->WaitForCompact(); + Status s = db_full->WaitForCompact(WaitForCompactOptions()); if (!s.ok()) { state.SkipWithError(s.ToString().c_str()); return; @@ -1368,7 +1368,7 @@ static void IteratorPrev(benchmark::State& state) { } auto db_full = static_cast_with_check(db.get()); - s = db_full->WaitForCompact(); + s = db_full->WaitForCompact(WaitForCompactOptions()); if (!s.ok()) { state.SkipWithError(s.ToString().c_str()); return; @@ -1460,7 +1460,7 @@ static void PrefixSeek(benchmark::State& state) { } auto db_full = static_cast_with_check(db.get()); - s = db_full->WaitForCompact(); + s = db_full->WaitForCompact(WaitForCompactOptions()); if (!s.ok()) { state.SkipWithError(s.ToString().c_str()); return; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 80127ed51..76e8d7bf6 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -2493,6 +2493,24 @@ TEST_P(TransactionTest, FlushTest2) { } } +TEST_P(TransactionTest, WaitForCompactAbortOnPause) { + Status s = ReOpen(); + ASSERT_OK(s); + assert(db != nullptr); + + DBImpl* db_impl = static_cast_with_check(db->GetRootDB()); + + // Pause the background jobs. + ASSERT_OK(db_impl->PauseBackgroundWork()); + + WaitForCompactOptions waitForCompactOptions = WaitForCompactOptions(); + waitForCompactOptions.abort_on_pause = true; + s = db->WaitForCompact(waitForCompactOptions); + ASSERT_NOK(s); + ASSERT_FALSE(s.IsNotSupported()); + ASSERT_TRUE(s.IsAborted()); +} + TEST_P(TransactionTest, NoSnapshotTest) { WriteOptions write_options; ReadOptions read_options;