diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index b2e372b91..3b1650412 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -153,6 +153,48 @@ class DBCompactionDirectIOTest : public DBCompactionTest, DBCompactionDirectIOTest() : DBCompactionTest() {} }; +class DBCompactionWaitForCompactTest + : public DBTestBase, + public testing::WithParamInterface> { + public: + DBCompactionWaitForCompactTest() + : DBTestBase("db_compaction_test", /*env_do_fsync=*/true) { + abort_on_pause_ = std::get<0>(GetParam()); + flush_ = std::get<1>(GetParam()); + } + bool abort_on_pause_; + bool flush_; + Options options_; + WaitForCompactOptions wait_for_compact_options_; + + void SetUp() override { + // This test sets up a scenario that one more L0 file will trigger a + // compaction + const int kNumKeysPerFile = 4; + const int kNumFiles = 2; + + options_ = CurrentOptions(); + options_.level0_file_num_compaction_trigger = kNumFiles + 1; + + wait_for_compact_options_ = WaitForCompactOptions(); + wait_for_compact_options_.abort_on_pause = abort_on_pause_; + wait_for_compact_options_.flush = flush_; + + DestroyAndReopen(options_); + + Random rnd(301); + for (int i = 0; i < kNumFiles; ++i) { + for (int j = 0; j < kNumKeysPerFile; ++j) { + ASSERT_OK( + Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ("2", FilesPerLevel()); + } +}; + // Param = true : target level is non-empty // Param = false: level between target level and source level // is not empty. @@ -3289,31 +3331,18 @@ TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } -TEST_F(DBCompactionTest, WaitForCompactWaitsOnCompactionToFinish) { - // This test creates a scenario to trigger compaction by number of L0 files - // Before the compaction finishes, it closes the DB - // Upon reopen, wait for the compaction to finish and checks for the number of - // compaction finished - - const int kNumKeysPerFile = 4; - const int kNumFiles = 2; +INSTANTIATE_TEST_CASE_P(DBCompactionWaitForCompactTest, + DBCompactionWaitForCompactTest, + ::testing::Values(std::make_tuple(false, false), + std::make_tuple(false, true), + std::make_tuple(true, false), + std::make_tuple(true, true))); - Options options = CurrentOptions(); - options.level0_file_num_compaction_trigger = kNumFiles + 1; - - DestroyAndReopen(options); - - // create the scenario where one more L0 file will trigger compaction - Random rnd(301); - for (int i = 0; i < kNumFiles; ++i) { - for (int j = 0; j < kNumKeysPerFile; ++j) { - ASSERT_OK( - Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */))); - } - ASSERT_OK(Flush()); - } - ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions())); - ASSERT_EQ("2", FilesPerLevel()); +TEST_P(DBCompactionWaitForCompactTest, + WaitForCompactWaitsOnCompactionToFinish) { + // Triggers a compaction. Before the compaction finishes, test + // closes the DB Upon reopen, wait for the compaction to finish and checks for + // the number of compaction finished int compaction_finished = 0; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( @@ -3337,6 +3366,7 @@ TEST_F(DBCompactionTest, WaitForCompactWaitsOnCompactionToFinish) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); // create compaction debt by adding one more L0 file then closing + Random rnd(123); GenerateNewRandomFile(&rnd, /* nowait */ true); ASSERT_EQ(0, compaction_finished); Close(); @@ -3344,124 +3374,53 @@ TEST_F(DBCompactionTest, WaitForCompactWaitsOnCompactionToFinish) { ASSERT_EQ(0, compaction_finished); // Reopen the db and we expect the compaction to be triggered. - Reopen(options); + Reopen(options_); // Wait for compaction to finish - ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions())); + ASSERT_OK(dbfull()->WaitForCompact(wait_for_compact_options_)); ASSERT_GT(compaction_finished, 0); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); } -TEST_F(DBCompactionTest, WaitForCompactAbortOnPauseAborted) { - // This test creates a scenario to trigger compaction by number of L0 files - // Before the compaction finishes, it pauses the compaction - // Calling WaitForCompact with option abort_on_pause=true should return - // Status::Aborted - - const int kNumKeysPerFile = 4; - const int kNumFiles = 2; - - Options options = CurrentOptions(); - options.level0_file_num_compaction_trigger = kNumFiles + 1; - - DestroyAndReopen(options); - - // create the scenario where one more L0 file will trigger compaction - Random rnd(301); - for (int i = 0; i < kNumFiles; ++i) { - for (int j = 0; j < kNumKeysPerFile; ++j) { - ASSERT_OK( - Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */))); - } - ASSERT_OK(Flush()); - } - ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions())); +TEST_P(DBCompactionWaitForCompactTest, WaitForCompactAbortOnPause) { + // Triggers a compaction. Before the compaction finishes, test + // pauses the compaction. Calling WaitForCompact() with option + // abort_on_pause=true should return Status::Aborted Or + // ContinueBackgroundWork() must be called // Now trigger L0 compaction by adding a file + Random rnd(123); GenerateNewRandomFile(&rnd, /* nowait */ true); ASSERT_OK(Flush()); // Pause the background jobs. ASSERT_OK(dbfull()->PauseBackgroundWork()); - WaitForCompactOptions waitForCompactOptions = WaitForCompactOptions(); - waitForCompactOptions.abort_on_pause = true; - Status s = dbfull()->WaitForCompact(waitForCompactOptions); - ASSERT_NOK(s); - ASSERT_TRUE(s.IsAborted()); -} - -TEST_F(DBCompactionTest, WaitForCompactContinueAfterPauseNotAborted) { - // This test creates a scenario to trigger compaction by number of L0 files - // Before the compaction finishes, it pauses the compaction - // Calling WaitForCompact with option abort_on_pause=false will not wait - // indefinitely upon ContinueBackgroundWork() - - const int kNumKeysPerFile = 4; - const int kNumFiles = 2; - - Options options = CurrentOptions(); - options.level0_file_num_compaction_trigger = kNumFiles + 1; - - DestroyAndReopen(options); - - // create the scenario where one more L0 file will trigger compaction - Random rnd(301); - for (int i = 0; i < kNumFiles; ++i) { - for (int j = 0; j < kNumKeysPerFile; ++j) { - ASSERT_OK( - Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */))); - } - ASSERT_OK(Flush()); + // If not abort_on_pause_ continue the background jobs. + if (!abort_on_pause_) { + ASSERT_OK(dbfull()->ContinueBackgroundWork()); } - ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions())); - - // Now trigger L0 compaction by adding a file - GenerateNewRandomFile(&rnd, /* nowait */ true); - ASSERT_OK(Flush()); - - // Pause the background jobs. - ASSERT_OK(dbfull()->PauseBackgroundWork()); - - // Continue the background jobs. - ASSERT_OK(dbfull()->ContinueBackgroundWork()); - WaitForCompactOptions waitForCompactOptions = WaitForCompactOptions(); - waitForCompactOptions.abort_on_pause = true; - ASSERT_OK(dbfull()->WaitForCompact(waitForCompactOptions)); + Status s = dbfull()->WaitForCompact(wait_for_compact_options_); + if (abort_on_pause_) { + ASSERT_NOK(s); + ASSERT_TRUE(s.IsAborted()); + } else { + ASSERT_OK(s); + } } -TEST_F(DBCompactionTest, WaitForCompactShutdownWhileWaiting) { - // This test creates a scenario to trigger compaction by number of L0 files - // Before the compaction finishes, db shuts down (by calling - // CancelAllBackgroundWork()) Calling WaitForCompact should return - // Status::IsShutdownInProgress() - - const int kNumKeysPerFile = 4; - const int kNumFiles = 2; - - Options options = CurrentOptions(); - options.level0_file_num_compaction_trigger = kNumFiles + 1; - - DestroyAndReopen(options); - - // create the scenario where one more L0 file will trigger compaction - Random rnd(301); - for (int i = 0; i < kNumFiles; ++i) { - for (int j = 0; j < kNumKeysPerFile; ++j) { - ASSERT_OK( - Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */))); - } - ASSERT_OK(Flush()); - } - ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions())); +TEST_P(DBCompactionWaitForCompactTest, WaitForCompactShutdownWhileWaiting) { + // Triggers a compaction. Before the compaction finishes, db + // shuts down (by calling CancelAllBackgroundWork()). Calling WaitForCompact() + // should return Status::IsShutdownInProgress() ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ {"CompactionJob::Run():Start", "DBCompactionTest::WaitForCompactShutdownWhileWaiting:0"}, - {"DBImpl::WaitForCompact:Start", + {"DBImpl::WaitForCompact:StartWaiting", "DBCompactionTest::WaitForCompactShutdownWhileWaiting:1"}, {"DBImpl::~DBImpl:WaitJob", "CompactionJob::Run():End"}, }); @@ -3469,6 +3428,7 @@ TEST_F(DBCompactionTest, WaitForCompactShutdownWhileWaiting) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); // Now trigger L0 compaction by adding a file + Random rnd(123); GenerateNewRandomFile(&rnd, /* nowait */ true); ASSERT_OK(Flush()); // Wait for compaction to start @@ -3476,7 +3436,7 @@ TEST_F(DBCompactionTest, WaitForCompactShutdownWhileWaiting) { // Wait for Compaction in another thread auto waiting_for_compaction_thread = port::Thread([this]() { - Status s = dbfull()->WaitForCompact(WaitForCompactOptions()); + Status s = dbfull()->WaitForCompact(wait_for_compact_options_); ASSERT_NOK(s); ASSERT_TRUE(s.IsShutdownInProgress()); }); @@ -3490,6 +3450,65 @@ TEST_F(DBCompactionTest, WaitForCompactShutdownWhileWaiting) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_P(DBCompactionWaitForCompactTest, WaitForCompactWithOptionToFlush) { + // After creating enough L0 files that one more file will trigger the + // compaction, write some data in memtable. Calls WaitForCompact with option + // to flush. This will flush the memtable to a new L0 file which will trigger + // compaction. Lastly check for expected number of files, closing + reopening + // DB won't trigger any flush or compaction + + int compaction_finished = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:AfterCompaction", + [&](void*) { compaction_finished++; }); + + int flush_finished = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "FlushJob::End", [&](void*) { flush_finished++; }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // write to memtable (overlapping key with first L0 file), but no flush is + // needed at this point. + ASSERT_OK(Put(Key(0), "some random string")); + ASSERT_EQ(0, compaction_finished); + ASSERT_EQ(0, flush_finished); + ASSERT_EQ("2", FilesPerLevel()); + + ASSERT_OK(dbfull()->WaitForCompact(wait_for_compact_options_)); + if (flush_) { + ASSERT_EQ("1,2", FilesPerLevel()); + ASSERT_EQ(1, compaction_finished); + ASSERT_EQ(1, flush_finished); + } else { + ASSERT_EQ(0, compaction_finished); + ASSERT_EQ(0, flush_finished); + ASSERT_EQ("2", FilesPerLevel()); + } + + compaction_finished = 0; + flush_finished = 0; + Close(); + Reopen(options_); + + ASSERT_EQ(0, flush_finished); + if (flush_) { + // if flushed already prior to close and reopen, expect there's no + // additional compaction needed + ASSERT_EQ(0, compaction_finished); + } else { + // if not flushed prior to close and reopen, expect L0 file creation from + // WAL when reopening which will trigger the compaction. + ASSERT_OK(dbfull()->WaitForCompact(wait_for_compact_options_)); + ASSERT_EQ(1, compaction_finished); + } + + ASSERT_EQ("1,2", FilesPerLevel()); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + static std::string ShortKey(int i) { assert(i < 10000); char buf[100]; diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index f035041c7..cb95a1676 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -29,35 +29,8 @@ namespace ROCKSDB_NAMESPACE { Status DBImpl::FlushForGetLiveFiles() { - mutex_.AssertHeld(); - - // flush all dirty data to disk. - Status status; - if (immutable_db_options_.atomic_flush) { - mutex_.Unlock(); - status = AtomicFlushMemTables(FlushOptions(), FlushReason::kGetLiveFiles); - if (status.IsColumnFamilyDropped()) { - status = Status::OK(); - } - mutex_.Lock(); - } else { - for (auto cfd : versions_->GetRefedColumnFamilySet()) { - if (cfd->IsDropped()) { - continue; - } - mutex_.Unlock(); - status = FlushMemTable(cfd, FlushOptions(), FlushReason::kGetLiveFiles); - TEST_SYNC_POINT("DBImpl::GetLiveFiles:1"); - TEST_SYNC_POINT("DBImpl::GetLiveFiles:2"); - mutex_.Lock(); - if (!status.ok() && !status.IsColumnFamilyDropped()) { - break; - } else if (status.IsColumnFamilyDropped()) { - status = Status::OK(); - } - } - } - return status; + return DBImpl::FlushAllColumnFamilies(FlushOptions(), + FlushReason::kGetLiveFiles); } Status DBImpl::GetLiveFiles(std::vector& ret, diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index f050863b8..fc68a1006 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1410,6 +1410,9 @@ class DBImpl : public DB { void NotifyOnExternalFileIngested( ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job); + Status FlushAllColumnFamilies(const FlushOptions& flush_options, + FlushReason flush_reason); + virtual Status FlushForGetLiveFiles(); void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 2c9b5f5df..c40dc1df9 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1808,6 +1808,37 @@ int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { ->mutable_cf_options.level0_stop_writes_trigger; } +Status DBImpl::FlushAllColumnFamilies(const FlushOptions& flush_options, + FlushReason flush_reason) { + mutex_.AssertHeld(); + Status status; + if (immutable_db_options_.atomic_flush) { + mutex_.Unlock(); + status = AtomicFlushMemTables(flush_options, flush_reason); + if (status.IsColumnFamilyDropped()) { + status = Status::OK(); + } + mutex_.Lock(); + } else { + for (auto cfd : versions_->GetRefedColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + mutex_.Unlock(); + status = FlushMemTable(cfd, flush_options, flush_reason); + TEST_SYNC_POINT("DBImpl::FlushAllColumnFamilies:1"); + TEST_SYNC_POINT("DBImpl::FlushAllColumnFamilies:2"); + mutex_.Lock(); + if (!status.ok() && !status.IsColumnFamilyDropped()) { + break; + } else if (status.IsColumnFamilyDropped()) { + status = Status::OK(); + } + } + } + return status; +} + Status DBImpl::Flush(const FlushOptions& flush_options, ColumnFamilyHandle* column_family) { auto cfh = static_cast_with_check(column_family); @@ -3963,7 +3994,14 @@ void DBImpl::GetSnapshotContext( Status DBImpl::WaitForCompact( const WaitForCompactOptions& wait_for_compact_options) { InstrumentedMutexLock l(&mutex_); - TEST_SYNC_POINT("DBImpl::WaitForCompact:Start"); + if (wait_for_compact_options.flush) { + Status s = DBImpl::FlushAllColumnFamilies(FlushOptions(), + FlushReason::kManualFlush); + if (!s.ok()) { + return s; + } + } + TEST_SYNC_POINT("DBImpl::WaitForCompact:StartWaiting"); for (;;) { if (shutting_down_.load(std::memory_order_acquire)) { return Status::ShutdownInProgress(); diff --git a/db/flush_job.cc b/db/flush_job.cc index 8b152ee3e..aaa6c91c3 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -352,6 +352,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta, << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos); } + TEST_SYNC_POINT("FlushJob::End"); return s; } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 5a82065de..ac9d19191 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -2087,6 +2087,9 @@ struct WaitForCompactOptions { // Otherwise, jobs that were queued, but not scheduled yet may never finish // and WaitForCompact() may wait indefinitely. bool abort_on_pause = false; + + // A boolean to flush all column families before starting to wait. + bool flush = false; }; } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/checkpoint/checkpoint_test.cc b/utilities/checkpoint/checkpoint_test.cc index 2bdab44fd..1cc062667 100644 --- a/utilities/checkpoint/checkpoint_test.cc +++ b/utilities/checkpoint/checkpoint_test.cc @@ -460,8 +460,8 @@ TEST_F(CheckpointTest, CheckpointCF) { Options options = CurrentOptions(); CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( - {{"CheckpointTest::CheckpointCF:2", "DBImpl::GetLiveFiles:2"}, - {"DBImpl::GetLiveFiles:1", "CheckpointTest::CheckpointCF:1"}}); + {{"CheckpointTest::CheckpointCF:2", "DBImpl::FlushAllColumnFamilies:2"}, + {"DBImpl::FlushAllColumnFamilies:1", "CheckpointTest::CheckpointCF:1"}}); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); @@ -985,4 +985,3 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } -