Add WaitForCompact with WaitForCompactOptions to public API (#11436)

Summary:
Context:

This is the first PR for WaitForCompact() Implementation with WaitForCompactOptions. In this PR, we are introducing `Status WaitForCompact(const WaitForCompactOptions& wait_for_compact_options)` in the public API. This currently utilizes the existing internal `WaitForCompact()` implementation (with default abort_on_pause = false). `abort_on_pause` has been moved to `WaitForCompactOptions&`. In the later PRs, we will introduce the following two options in `WaitForCompactOptions`

1. `bool flush = false` by default - If true, flush before waiting for compactions to finish. Must be set to true to ensure no immediate compactions (except perhaps periodic compactions) after closing and re-opening the DB.
2. `bool close_db = false` by default - If true, will also close the DB upon compactions finishing.

1. struct `WaitForCompactOptions` added to options.h and `abort_on_pause` in the internal API moved to the option struct.
2. `Status WaitForCompact(const WaitForCompactOptions& wait_for_compact_options)` introduced in `db.h`
3. Changed the internal WaitForCompact() to `WaitForCompact(const WaitForCompactOptions& wait_for_compact_options)` and checks for the `abort_on_pause` inside the option.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11436

Test Plan:
Following tests added
- `DBCompactionTest::WaitForCompactWaitsOnCompactionToFinish`
- `DBCompactionTest::WaitForCompactAbortOnPauseAborted`
- `DBCompactionTest::WaitForCompactContinueAfterPauseNotAborted`
- `DBCompactionTest::WaitForCompactShutdownWhileWaiting`
- `TransactionTest::WaitForCompactAbortOnPause`

NOTE: `TransactionTest::WaitForCompactAbortOnPause` was added to use `StackableDB` to ensure the wrapper function is in place.

Reviewed By: pdillinger

Differential Revision: D45799659

Pulled By: jaykorean

fbshipit-source-id: b5b58f95957f2ab47d1221dee32a61d6cdc4685b
oxigraph-main
Jay Huh 1 year ago committed by Facebook GitHub Bot
parent d1ae7f6c41
commit 81aeb15988
  1. 1
      HISTORY.md
  2. 2
      db/compaction/compaction_job.cc
  3. 38
      db/compaction/tiered_compaction_test.cc
  4. 213
      db/db_compaction_test.cc
  5. 17
      db/db_impl/db_impl.h
  6. 9
      db/db_impl/db_impl_compaction_flush.cc
  7. 9
      db/db_impl/db_impl_debug.cc
  8. 5
      db/db_test.cc
  9. 6
      db/seqno_time_test.cc
  10. 3
      db_stress_tool/db_stress_test_base.cc
  11. 12
      include/rocksdb/db.h
  12. 9
      include/rocksdb/options.h
  13. 5
      include/rocksdb/utilities/stackable_db.h
  14. 20
      microbench/db_basic_bench.cc
  15. 18
      utilities/transactions/transaction_test.cc

@ -20,6 +20,7 @@
* EXPERIMENTAL: Add new API `DB::ClipColumnFamily` to clip the key in CF to a certain range. It will physically deletes all keys outside the range including tombstones.
* Add `MakeSharedCache()` construction functions to various cache Options objects, and deprecated the `NewWhateverCache()` functions with long parameter lists.
* Changed the meaning of various Bloom filter stats (prefix vs. whole key), with iterator-related filtering only being tracked in the new \*`_LEVEL_SEEK_`\*. stats. (#11460)
* Add `WaitForCompact()` to wait for all flush and compactions jobs to finish. Jobs to wait include the unscheduled (queued, but not scheduled yet).
### Behavior changes
* For x86, CPU features are no longer detected at runtime nor in build scripts, but in source code using common preprocessor defines. This will likely unlock some small performance improvements on some newer hardware, but could hurt performance of the kCRC32c checksum, which is no longer the default, on some "portable" builds. See PR #11419 for details.

@ -808,8 +808,8 @@ Status CompactionJob::Run() {
RecordCompactionIOStats();
LogFlush(db_options_.info_log);
TEST_SYNC_POINT("CompactionJob::Run():End");
compact_->status = status;
TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():EndStatusSet", &status);
return status;
}

@ -209,7 +209,7 @@ TEST_P(TieredCompactionTest, SequenceBasedTieredStorageUniversal) {
seq_history.emplace_back(dbfull()->GetLatestSequenceNumber());
expect_stats[0].Add(kBasicFlushStats);
}
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// the penultimate level file temperature is not cold, all data are output to
// the penultimate level.
@ -374,7 +374,7 @@ TEST_P(TieredCompactionTest, RangeBasedTieredStorageUniversal) {
ASSERT_OK(Flush());
expect_stats[0].Add(kBasicFlushStats);
}
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
@ -446,7 +446,7 @@ TEST_P(TieredCompactionTest, RangeBasedTieredStorageUniversal) {
ASSERT_OK(Flush());
}
// make sure the compaction is able to finish
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
@ -911,7 +911,7 @@ TEST_P(TieredCompactionTest, SequenceBasedTieredStorageLevel) {
ASSERT_OK(Flush());
expect_stats[0].Add(kBasicFlushStats);
}
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// non last level is hot
ASSERT_EQ("0,1", FilesPerLevel());
@ -954,7 +954,7 @@ TEST_P(TieredCompactionTest, SequenceBasedTieredStorageLevel) {
ASSERT_OK(Flush());
seq_history.emplace_back(dbfull()->GetLatestSequenceNumber());
}
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
@ -1005,7 +1005,7 @@ TEST_P(TieredCompactionTest, SequenceBasedTieredStorageLevel) {
ASSERT_OK(Flush());
seq_history.emplace_back(dbfull()->GetLatestSequenceNumber());
}
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
latest_cold_seq = seq_history[0];
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
@ -1135,7 +1135,7 @@ TEST_P(TieredCompactionTest, RangeBasedTieredStorageLevel) {
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
@ -1265,7 +1265,7 @@ TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeManualCompaction) {
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// all data is pushed to the last level
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
@ -1327,7 +1327,7 @@ TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeAutoCompaction) {
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// all data is pushed to the last level
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
@ -1360,7 +1360,7 @@ TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeAutoCompaction) {
});
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
// all data is moved up to the penultimate level
@ -1403,7 +1403,7 @@ TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimePartial) {
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// all data is pushed to the last level
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
@ -1530,7 +1530,7 @@ TEST_F(PrecludeLastLevelTest, LastLevelOnlyCompactionPartial) {
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// all data is pushed to the last level
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
@ -1609,7 +1609,7 @@ TEST_P(PrecludeLastLevelTestWithParms, LastLevelOnlyCompactionNoPreclude) {
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// all data is pushed to the last level
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
@ -1705,7 +1705,7 @@ TEST_P(PrecludeLastLevelTestWithParms, LastLevelOnlyCompactionNoPreclude) {
manual_compaction_thread.join();
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
if (enable_preclude_last_level) {
ASSERT_NE("0,0,0,0,0,1,1", FilesPerLevel());
@ -1841,7 +1841,7 @@ TEST_P(PrecludeLastLevelTestWithParms, PeriodicCompactionToPenultimateLevel) {
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
stop_token.reset();
@ -1940,7 +1940,7 @@ TEST_F(PrecludeLastLevelTest, PartialPenultimateLevelCompaction) {
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// L5: [0,19] [20,39] [40,299]
// L6: [0, 299]
@ -2106,7 +2106,7 @@ TEST_F(PrecludeLastLevelTest, RangeDelsCauseFileEndpointsToOverlap) {
Slice begin_key(begin_key_buf), end_key(end_key_buf);
ASSERT_OK(db_->SuggestCompactRange(db_->DefaultColumnFamily(), &begin_key,
&end_key));
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,3,3", FilesPerLevel());
ASSERT_EQ(1, per_key_comp_num);
verify_db();
@ -2116,7 +2116,7 @@ TEST_F(PrecludeLastLevelTest, RangeDelsCauseFileEndpointsToOverlap) {
db_->ReleaseSnapshot(snap2);
ASSERT_OK(db_->SuggestCompactRange(db_->DefaultColumnFamily(), &begin_key,
&end_key));
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,3,3", FilesPerLevel());
ASSERT_EQ(2, per_key_comp_num);
verify_db();
@ -2126,7 +2126,7 @@ TEST_F(PrecludeLastLevelTest, RangeDelsCauseFileEndpointsToOverlap) {
db_->ReleaseSnapshot(snap1);
ASSERT_OK(db_->SuggestCompactRange(db_->DefaultColumnFamily(), &begin_key,
&end_key));
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,2,3", FilesPerLevel());
ASSERT_EQ(3, per_key_comp_num);
verify_db();

@ -3285,6 +3285,209 @@ TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) {
TEST_SYNC_POINT(
"DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2");
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBCompactionTest, WaitForCompactWaitsOnCompactionToFinish) {
// This test creates a scenario to trigger compaction by number of L0 files
// Before the compaction finishes, it closes the DB
// Upon reopen, wait for the compaction to finish and checks for the number of
// compaction finished
const int kNumKeysPerFile = 4;
const int kNumFiles = 2;
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumFiles + 1;
DestroyAndReopen(options);
// create the scenario where one more L0 file will trigger compaction
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions()));
ASSERT_EQ("2", FilesPerLevel());
int compaction_finished = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():EndStatusSet", [&](void* arg) {
auto status = static_cast<Status*>(arg);
if (status->ok()) {
compaction_finished++;
}
});
// To make sure there's a flush/compaction debt
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MaybeScheduleFlushOrCompaction:BeforeSchedule", [&](void* arg) {
auto unscheduled_flushes = *static_cast<int*>(arg);
ASSERT_GT(unscheduled_flushes, 0);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBCompactionTest::WaitForCompactWaitsOnCompactionToFinish",
"DBImpl::MaybeScheduleFlushOrCompaction:BeforeSchedule"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// create compaction debt by adding one more L0 file then closing
GenerateNewRandomFile(&rnd, /* nowait */ true);
ASSERT_EQ(0, compaction_finished);
Close();
TEST_SYNC_POINT("DBCompactionTest::WaitForCompactWaitsOnCompactionToFinish");
ASSERT_EQ(0, compaction_finished);
// Reopen the db and we expect the compaction to be triggered.
Reopen(options);
// Wait for compaction to finish
ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions()));
ASSERT_GT(compaction_finished, 0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(DBCompactionTest, WaitForCompactAbortOnPauseAborted) {
// This test creates a scenario to trigger compaction by number of L0 files
// Before the compaction finishes, it pauses the compaction
// Calling WaitForCompact with option abort_on_pause=true should return
// Status::Aborted
const int kNumKeysPerFile = 4;
const int kNumFiles = 2;
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumFiles + 1;
DestroyAndReopen(options);
// create the scenario where one more L0 file will trigger compaction
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions()));
// Now trigger L0 compaction by adding a file
GenerateNewRandomFile(&rnd, /* nowait */ true);
ASSERT_OK(Flush());
// Pause the background jobs.
ASSERT_OK(dbfull()->PauseBackgroundWork());
WaitForCompactOptions waitForCompactOptions = WaitForCompactOptions();
waitForCompactOptions.abort_on_pause = true;
Status s = dbfull()->WaitForCompact(waitForCompactOptions);
ASSERT_NOK(s);
ASSERT_TRUE(s.IsAborted());
}
TEST_F(DBCompactionTest, WaitForCompactContinueAfterPauseNotAborted) {
// This test creates a scenario to trigger compaction by number of L0 files
// Before the compaction finishes, it pauses the compaction
// Calling WaitForCompact with option abort_on_pause=false will not wait
// indefinitely upon ContinueBackgroundWork()
const int kNumKeysPerFile = 4;
const int kNumFiles = 2;
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumFiles + 1;
DestroyAndReopen(options);
// create the scenario where one more L0 file will trigger compaction
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions()));
// Now trigger L0 compaction by adding a file
GenerateNewRandomFile(&rnd, /* nowait */ true);
ASSERT_OK(Flush());
// Pause the background jobs.
ASSERT_OK(dbfull()->PauseBackgroundWork());
// Continue the background jobs.
ASSERT_OK(dbfull()->ContinueBackgroundWork());
WaitForCompactOptions waitForCompactOptions = WaitForCompactOptions();
waitForCompactOptions.abort_on_pause = true;
ASSERT_OK(dbfull()->WaitForCompact(waitForCompactOptions));
}
TEST_F(DBCompactionTest, WaitForCompactShutdownWhileWaiting) {
// This test creates a scenario to trigger compaction by number of L0 files
// Before the compaction finishes, db shuts down (by calling
// CancelAllBackgroundWork()) Calling WaitForCompact should return
// Status::IsShutdownInProgress()
const int kNumKeysPerFile = 4;
const int kNumFiles = 2;
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumFiles + 1;
DestroyAndReopen(options);
// create the scenario where one more L0 file will trigger compaction
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions()));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"CompactionJob::Run():Start",
"DBCompactionTest::WaitForCompactShutdownWhileWaiting:0"},
{"DBImpl::WaitForCompact:Start",
"DBCompactionTest::WaitForCompactShutdownWhileWaiting:1"},
{"DBImpl::~DBImpl:WaitJob", "CompactionJob::Run():End"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Now trigger L0 compaction by adding a file
GenerateNewRandomFile(&rnd, /* nowait */ true);
ASSERT_OK(Flush());
// Wait for compaction to start
TEST_SYNC_POINT("DBCompactionTest::WaitForCompactShutdownWhileWaiting:0");
// Wait for Compaction in another thread
auto waiting_for_compaction_thread = port::Thread([this]() {
Status s = dbfull()->WaitForCompact(WaitForCompactOptions());
ASSERT_NOK(s);
ASSERT_TRUE(s.IsShutdownInProgress());
});
TEST_SYNC_POINT("DBCompactionTest::WaitForCompactShutdownWhileWaiting:1");
// Shutdown after wait started, but before the compaction finishes
auto closing_thread = port::Thread([this]() { Close(); });
waiting_for_compaction_thread.join();
closing_thread.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
static std::string ShortKey(int i) {
@ -5822,7 +6025,7 @@ TEST_P(RoundRobinSubcompactionsAgainstPressureToken, PressureTokenTest) {
}
TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstPressureToken:2");
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_TRUE(num_planned_subcompactions_verified);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
@ -5897,7 +6100,7 @@ TEST_P(RoundRobinSubcompactionsAgainstResources, SubcompactionsUsingResources) {
"CompactionJob::ReleaseSubcompactionResources:1"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()}));
TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:0");
TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:1");
@ -5913,7 +6116,7 @@ TEST_P(RoundRobinSubcompactionsAgainstResources, SubcompactionsUsingResources) {
total_low_pri_threads_ - 1,
env_->ReleaseThreads(total_low_pri_threads_ - 1, Env::Priority::LOW));
TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:4");
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_TRUE(num_planned_subcompactions_verified);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
@ -5985,11 +6188,11 @@ TEST_P(DBCompactionTestWithParam, RoundRobinWithoutAdditionalResources) {
"BackgroundCallCompaction:0"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()}));
TEST_SYNC_POINT("DBCompactionTest::RoundRobinWithoutAdditionalResources:0");
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_TRUE(num_planned_subcompactions_verified);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();

@ -1058,16 +1058,8 @@ class DBImpl : public DB {
VersionSet* GetVersionSet() const { return versions_.get(); }
// Wait for all flush and compactions jobs to finish. Jobs to wait include the
// unscheduled (queued, but not scheduled yet). If the db is shutting down,
// Status::ShutdownInProgress will be returned. If PauseBackgroundWork() was
// called prior to this, this may potentially wait for unscheduled jobs
// indefinitely. abort_on_pause can be set to true to abort, and
// Status::Aborted will be returned immediately. This may also never return if
// there's sufficient ongoing writes that keeps flush and compaction going
// without stopping. The user would have to cease all the writes to DB to make
// this eventually return in a stable state.
Status WaitForCompact(bool abort_on_pause = false);
Status WaitForCompact(
const WaitForCompactOptions& wait_for_compact_options) override;
#ifndef NDEBUG
// Compact any files in the named level that overlap [*begin, *end]
@ -1106,8 +1098,9 @@ class DBImpl : public DB {
// Wait for memtable compaction
Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr);
// Wait for any compaction
Status TEST_WaitForCompact(bool abort_on_pause = false);
Status TEST_WaitForCompact();
Status TEST_WaitForCompact(
const WaitForCompactOptions& wait_for_compact_options);
// Wait for any background purge
Status TEST_WaitForPurge();

@ -2612,6 +2612,9 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ < bg_job_limits.max_flushes) {
TEST_SYNC_POINT_CALLBACK(
"DBImpl::MaybeScheduleFlushOrCompaction:BeforeSchedule",
&unscheduled_flushes_);
bg_flush_scheduled_++;
FlushThreadArg* fta = new FlushThreadArg;
fta->db_ = this;
@ -3957,13 +3960,15 @@ void DBImpl::GetSnapshotContext(
*snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot);
}
Status DBImpl::WaitForCompact(bool abort_on_pause) {
Status DBImpl::WaitForCompact(
const WaitForCompactOptions& wait_for_compact_options) {
InstrumentedMutexLock l(&mutex_);
TEST_SYNC_POINT("DBImpl::WaitForCompact:Start");
for (;;) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
if (bg_work_paused_ && abort_on_pause) {
if (bg_work_paused_ && wait_for_compact_options.abort_on_pause) {
return Status::Aborted();
}
if ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||

@ -178,9 +178,12 @@ Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) {
return WaitForFlushMemTable(cfd, nullptr, false);
}
Status DBImpl::TEST_WaitForCompact(bool abort_on_pause) {
// Wait until the compaction completes
return WaitForCompact(abort_on_pause);
Status DBImpl::TEST_WaitForCompact() {
return WaitForCompact(WaitForCompactOptions());
}
Status DBImpl::TEST_WaitForCompact(
const WaitForCompactOptions& wait_for_compact_options) {
return WaitForCompact(wait_for_compact_options);
}
Status DBImpl::TEST_WaitForPurge() {

@ -3264,6 +3264,11 @@ class ModelDB : public DB {
void DisableManualCompaction() override { return; }
virtual Status WaitForCompact(
const WaitForCompactOptions& /* wait_for_compact_options */) override {
return Status::OK();
}
using DB::NumberLevels;
int NumberLevels(ColumnFamilyHandle* /*column_family*/) override { return 1; }

@ -93,7 +93,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) {
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// All data is hot, only output to penultimate level
ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
@ -114,7 +114,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) {
});
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
}
@ -128,7 +128,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) {
});
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
CompactRangeOptions cro;

@ -2670,8 +2670,7 @@ void StressTest::Open(SharedState* shared) {
// Ingested errors might happen in background compactions. We
// wait for all compactions to finish to make sure DB is in
// clean state before executing queries.
s = static_cast_with_check<DBImpl>(db_->GetRootDB())
->WaitForCompact();
s = db_->GetRootDB()->WaitForCompact(WaitForCompactOptions());
if (!s.ok()) {
for (auto cf : column_families_) {
delete cf;

@ -53,6 +53,7 @@ struct Options;
struct ReadOptions;
struct TableProperties;
struct WriteOptions;
struct WaitForCompactOptions;
class Env;
class EventListener;
class FileSystem;
@ -1447,6 +1448,17 @@ class DB {
// DisableManualCompaction() has been called.
virtual void EnableManualCompaction() = 0;
// Wait for all flush and compactions jobs to finish. Jobs to wait include the
// unscheduled (queued, but not scheduled yet). If the db is shutting down,
// Status::ShutdownInProgress will be returned.
//
// NOTE: This may also never return if there's sufficient ongoing writes that
// keeps flush and compaction going without stopping. The user would have to
// cease all the writes to DB to make this eventually return in a stable
// state.
virtual Status WaitForCompact(
const WaitForCompactOptions& /* wait_for_compact_options */) = 0;
// Number of levels used for this DB.
virtual int NumberLevels(ColumnFamilyHandle* column_family) = 0;
virtual int NumberLevels() { return NumberLevels(DefaultColumnFamily()); }

@ -2080,4 +2080,13 @@ struct LiveFilesStorageInfoOptions {
uint64_t wal_size_for_flush = 0;
};
struct WaitForCompactOptions {
// A boolean to abort waiting in case of a pause (PauseBackgroundWork()
// called) If true, Status::Aborted will be returned immediately. If false,
// ContinueBackgroundWork() must be called to resume the background jobs.
// Otherwise, jobs that were queued, but not scheduled yet may never finish
// and WaitForCompact() may wait indefinitely.
bool abort_on_pause = false;
};
} // namespace ROCKSDB_NAMESPACE

@ -347,6 +347,11 @@ class StackableDB : public DB {
return db_->DisableManualCompaction();
}
virtual Status WaitForCompact(
const WaitForCompactOptions& wait_for_compact_options) override {
return db_->WaitForCompact(wait_for_compact_options);
}
using DB::NumberLevels;
virtual int NumberLevels(ColumnFamilyHandle* column_family) override {
return db_->NumberLevels(column_family);

@ -303,7 +303,7 @@ static void DBPut(benchmark::State& state) {
if (state.thread_index() == 0) {
auto db_full = static_cast_with_check<DBImpl>(db.get());
Status s = db_full->WaitForCompact();
Status s = db_full->WaitForCompact(WaitForCompactOptions());
if (!s.ok()) {
state.SkipWithError(s.ToString().c_str());
return;
@ -410,7 +410,7 @@ static void ManualCompaction(benchmark::State& state) {
if (state.thread_index() == 0) {
auto db_full = static_cast_with_check<DBImpl>(db.get());
s = db_full->WaitForCompact();
s = db_full->WaitForCompact(WaitForCompactOptions());
if (!s.ok()) {
state.SkipWithError(s.ToString().c_str());
return;
@ -510,7 +510,7 @@ static void ManualFlush(benchmark::State& state) {
if (state.thread_index() == 0) {
auto db_full = static_cast_with_check<DBImpl>(db.get());
Status s = db_full->WaitForCompact();
Status s = db_full->WaitForCompact(WaitForCompactOptions());
if (!s.ok()) {
state.SkipWithError(s.ToString().c_str());
return;
@ -594,7 +594,7 @@ static void DBGet(benchmark::State& state) {
}
auto db_full = static_cast_with_check<DBImpl>(db.get());
s = db_full->WaitForCompact();
s = db_full->WaitForCompact(WaitForCompactOptions());
if (!s.ok()) {
state.SkipWithError(s.ToString().c_str());
return;
@ -707,7 +707,7 @@ static void SimpleGetWithPerfContext(benchmark::State& state) {
}
}
auto db_full = static_cast_with_check<DBImpl>(db.get());
s = db_full->WaitForCompact();
s = db_full->WaitForCompact(WaitForCompactOptions());
if (!s.ok()) {
state.SkipWithError(s.ToString().c_str());
return;
@ -1115,7 +1115,7 @@ static void IteratorSeek(benchmark::State& state) {
}
auto db_full = static_cast_with_check<DBImpl>(db.get());
s = db_full->WaitForCompact();
s = db_full->WaitForCompact(WaitForCompactOptions());
if (!s.ok()) {
state.SkipWithError(s.ToString().c_str());
return;
@ -1206,7 +1206,7 @@ static void IteratorNext(benchmark::State& state) {
}
auto db_full = static_cast_with_check<DBImpl>(db.get());
s = db_full->WaitForCompact();
s = db_full->WaitForCompact(WaitForCompactOptions());
if (!s.ok()) {
state.SkipWithError(s.ToString().c_str());
return;
@ -1270,7 +1270,7 @@ static void IteratorNextWithPerfContext(benchmark::State& state) {
}
}
auto db_full = static_cast_with_check<DBImpl>(db.get());
Status s = db_full->WaitForCompact();
Status s = db_full->WaitForCompact(WaitForCompactOptions());
if (!s.ok()) {
state.SkipWithError(s.ToString().c_str());
return;
@ -1368,7 +1368,7 @@ static void IteratorPrev(benchmark::State& state) {
}
auto db_full = static_cast_with_check<DBImpl>(db.get());
s = db_full->WaitForCompact();
s = db_full->WaitForCompact(WaitForCompactOptions());
if (!s.ok()) {
state.SkipWithError(s.ToString().c_str());
return;
@ -1460,7 +1460,7 @@ static void PrefixSeek(benchmark::State& state) {
}
auto db_full = static_cast_with_check<DBImpl>(db.get());
s = db_full->WaitForCompact();
s = db_full->WaitForCompact(WaitForCompactOptions());
if (!s.ok()) {
state.SkipWithError(s.ToString().c_str());
return;

@ -2493,6 +2493,24 @@ TEST_P(TransactionTest, FlushTest2) {
}
}
TEST_P(TransactionTest, WaitForCompactAbortOnPause) {
Status s = ReOpen();
ASSERT_OK(s);
assert(db != nullptr);
DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
// Pause the background jobs.
ASSERT_OK(db_impl->PauseBackgroundWork());
WaitForCompactOptions waitForCompactOptions = WaitForCompactOptions();
waitForCompactOptions.abort_on_pause = true;
s = db->WaitForCompact(waitForCompactOptions);
ASSERT_NOK(s);
ASSERT_FALSE(s.IsNotSupported());
ASSERT_TRUE(s.IsAborted());
}
TEST_P(TransactionTest, NoSnapshotTest) {
WriteOptions write_options;
ReadOptions read_options;

Loading…
Cancel
Save