Flush option in WaitForCompact() (#11483)

Summary:
Context:

As mentioned in https://github.com/facebook/rocksdb/issues/11436, introducing `flush` option in `WaitForCompactOptions` to flush before waiting for compactions to finish. Must be set to true to ensure no immediate compactions (except perhaps periodic compactions) after closing and re-opening the DB.
1. `bool flush = false` added to `WaitForCompactOptions`
2. `DBImpl::FlushAllColumnFamilies()` is introduced and `DBImpl::FlushForGetLiveFiles()` is refactored to call it.
3. `DBImpl::FlushAllColumnFamilies()` gets called before waiting in `WaitForCompact()` if `flush` option is `true`
4. Some previous WaitForCompact tests were parameterized to include both cases for `abort_on_pause_` being true/false as well as `flush_` being true/false

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11483

Test Plan:
- `DBCompactionTest::WaitForCompactWithOptionToFlush` added
- Changed existing DBCompactionTest::WaitForCompact tests to `DBCompactionWaitForCompactTest` to include params

Reviewed By: pdillinger

Differential Revision: D46289770

Pulled By: jaykorean

fbshipit-source-id: 70d3f461d96a6e06390be60170dd7c4d0d38f8b0
oxigraph-main
Jay Huh 2 years ago committed by Facebook GitHub Bot
parent 56ca9e3106
commit 87bc929db3
  1. 257
      db/db_compaction_test.cc
  2. 31
      db/db_filesnapshot.cc
  3. 3
      db/db_impl/db_impl.h
  4. 40
      db/db_impl/db_impl_compaction_flush.cc
  5. 1
      db/flush_job.cc
  6. 3
      include/rocksdb/options.h
  7. 5
      utilities/checkpoint/checkpoint_test.cc

@ -153,6 +153,48 @@ class DBCompactionDirectIOTest : public DBCompactionTest,
DBCompactionDirectIOTest() : DBCompactionTest() {}
};
class DBCompactionWaitForCompactTest
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool, bool>> {
public:
DBCompactionWaitForCompactTest()
: DBTestBase("db_compaction_test", /*env_do_fsync=*/true) {
abort_on_pause_ = std::get<0>(GetParam());
flush_ = std::get<1>(GetParam());
}
bool abort_on_pause_;
bool flush_;
Options options_;
WaitForCompactOptions wait_for_compact_options_;
void SetUp() override {
// This test sets up a scenario that one more L0 file will trigger a
// compaction
const int kNumKeysPerFile = 4;
const int kNumFiles = 2;
options_ = CurrentOptions();
options_.level0_file_num_compaction_trigger = kNumFiles + 1;
wait_for_compact_options_ = WaitForCompactOptions();
wait_for_compact_options_.abort_on_pause = abort_on_pause_;
wait_for_compact_options_.flush = flush_;
DestroyAndReopen(options_);
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("2", FilesPerLevel());
}
};
// Param = true : target level is non-empty
// Param = false: level between target level and source level
// is not empty.
@ -3289,31 +3331,18 @@ TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBCompactionTest, WaitForCompactWaitsOnCompactionToFinish) {
// This test creates a scenario to trigger compaction by number of L0 files
// Before the compaction finishes, it closes the DB
// Upon reopen, wait for the compaction to finish and checks for the number of
// compaction finished
const int kNumKeysPerFile = 4;
const int kNumFiles = 2;
INSTANTIATE_TEST_CASE_P(DBCompactionWaitForCompactTest,
DBCompactionWaitForCompactTest,
::testing::Values(std::make_tuple(false, false),
std::make_tuple(false, true),
std::make_tuple(true, false),
std::make_tuple(true, true)));
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumFiles + 1;
DestroyAndReopen(options);
// create the scenario where one more L0 file will trigger compaction
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions()));
ASSERT_EQ("2", FilesPerLevel());
TEST_P(DBCompactionWaitForCompactTest,
WaitForCompactWaitsOnCompactionToFinish) {
// Triggers a compaction. Before the compaction finishes, test
// closes the DB Upon reopen, wait for the compaction to finish and checks for
// the number of compaction finished
int compaction_finished = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
@ -3337,6 +3366,7 @@ TEST_F(DBCompactionTest, WaitForCompactWaitsOnCompactionToFinish) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// create compaction debt by adding one more L0 file then closing
Random rnd(123);
GenerateNewRandomFile(&rnd, /* nowait */ true);
ASSERT_EQ(0, compaction_finished);
Close();
@ -3344,124 +3374,53 @@ TEST_F(DBCompactionTest, WaitForCompactWaitsOnCompactionToFinish) {
ASSERT_EQ(0, compaction_finished);
// Reopen the db and we expect the compaction to be triggered.
Reopen(options);
Reopen(options_);
// Wait for compaction to finish
ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions()));
ASSERT_OK(dbfull()->WaitForCompact(wait_for_compact_options_));
ASSERT_GT(compaction_finished, 0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(DBCompactionTest, WaitForCompactAbortOnPauseAborted) {
// This test creates a scenario to trigger compaction by number of L0 files
// Before the compaction finishes, it pauses the compaction
// Calling WaitForCompact with option abort_on_pause=true should return
// Status::Aborted
const int kNumKeysPerFile = 4;
const int kNumFiles = 2;
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumFiles + 1;
DestroyAndReopen(options);
// create the scenario where one more L0 file will trigger compaction
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions()));
TEST_P(DBCompactionWaitForCompactTest, WaitForCompactAbortOnPause) {
// Triggers a compaction. Before the compaction finishes, test
// pauses the compaction. Calling WaitForCompact() with option
// abort_on_pause=true should return Status::Aborted Or
// ContinueBackgroundWork() must be called
// Now trigger L0 compaction by adding a file
Random rnd(123);
GenerateNewRandomFile(&rnd, /* nowait */ true);
ASSERT_OK(Flush());
// Pause the background jobs.
ASSERT_OK(dbfull()->PauseBackgroundWork());
WaitForCompactOptions waitForCompactOptions = WaitForCompactOptions();
waitForCompactOptions.abort_on_pause = true;
Status s = dbfull()->WaitForCompact(waitForCompactOptions);
ASSERT_NOK(s);
ASSERT_TRUE(s.IsAborted());
}
TEST_F(DBCompactionTest, WaitForCompactContinueAfterPauseNotAborted) {
// This test creates a scenario to trigger compaction by number of L0 files
// Before the compaction finishes, it pauses the compaction
// Calling WaitForCompact with option abort_on_pause=false will not wait
// indefinitely upon ContinueBackgroundWork()
const int kNumKeysPerFile = 4;
const int kNumFiles = 2;
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumFiles + 1;
DestroyAndReopen(options);
// create the scenario where one more L0 file will trigger compaction
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */)));
}
ASSERT_OK(Flush());
// If not abort_on_pause_ continue the background jobs.
if (!abort_on_pause_) {
ASSERT_OK(dbfull()->ContinueBackgroundWork());
}
ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions()));
// Now trigger L0 compaction by adding a file
GenerateNewRandomFile(&rnd, /* nowait */ true);
ASSERT_OK(Flush());
// Pause the background jobs.
ASSERT_OK(dbfull()->PauseBackgroundWork());
// Continue the background jobs.
ASSERT_OK(dbfull()->ContinueBackgroundWork());
WaitForCompactOptions waitForCompactOptions = WaitForCompactOptions();
waitForCompactOptions.abort_on_pause = true;
ASSERT_OK(dbfull()->WaitForCompact(waitForCompactOptions));
Status s = dbfull()->WaitForCompact(wait_for_compact_options_);
if (abort_on_pause_) {
ASSERT_NOK(s);
ASSERT_TRUE(s.IsAborted());
} else {
ASSERT_OK(s);
}
}
TEST_F(DBCompactionTest, WaitForCompactShutdownWhileWaiting) {
// This test creates a scenario to trigger compaction by number of L0 files
// Before the compaction finishes, db shuts down (by calling
// CancelAllBackgroundWork()) Calling WaitForCompact should return
// Status::IsShutdownInProgress()
const int kNumKeysPerFile = 4;
const int kNumFiles = 2;
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumFiles + 1;
DestroyAndReopen(options);
// create the scenario where one more L0 file will trigger compaction
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(100 /* len */)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact(WaitForCompactOptions()));
TEST_P(DBCompactionWaitForCompactTest, WaitForCompactShutdownWhileWaiting) {
// Triggers a compaction. Before the compaction finishes, db
// shuts down (by calling CancelAllBackgroundWork()). Calling WaitForCompact()
// should return Status::IsShutdownInProgress()
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"CompactionJob::Run():Start",
"DBCompactionTest::WaitForCompactShutdownWhileWaiting:0"},
{"DBImpl::WaitForCompact:Start",
{"DBImpl::WaitForCompact:StartWaiting",
"DBCompactionTest::WaitForCompactShutdownWhileWaiting:1"},
{"DBImpl::~DBImpl:WaitJob", "CompactionJob::Run():End"},
});
@ -3469,6 +3428,7 @@ TEST_F(DBCompactionTest, WaitForCompactShutdownWhileWaiting) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Now trigger L0 compaction by adding a file
Random rnd(123);
GenerateNewRandomFile(&rnd, /* nowait */ true);
ASSERT_OK(Flush());
// Wait for compaction to start
@ -3476,7 +3436,7 @@ TEST_F(DBCompactionTest, WaitForCompactShutdownWhileWaiting) {
// Wait for Compaction in another thread
auto waiting_for_compaction_thread = port::Thread([this]() {
Status s = dbfull()->WaitForCompact(WaitForCompactOptions());
Status s = dbfull()->WaitForCompact(wait_for_compact_options_);
ASSERT_NOK(s);
ASSERT_TRUE(s.IsShutdownInProgress());
});
@ -3490,6 +3450,65 @@ TEST_F(DBCompactionTest, WaitForCompactShutdownWhileWaiting) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_P(DBCompactionWaitForCompactTest, WaitForCompactWithOptionToFlush) {
// After creating enough L0 files that one more file will trigger the
// compaction, write some data in memtable. Calls WaitForCompact with option
// to flush. This will flush the memtable to a new L0 file which will trigger
// compaction. Lastly check for expected number of files, closing + reopening
// DB won't trigger any flush or compaction
int compaction_finished = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:AfterCompaction",
[&](void*) { compaction_finished++; });
int flush_finished = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"FlushJob::End", [&](void*) { flush_finished++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// write to memtable (overlapping key with first L0 file), but no flush is
// needed at this point.
ASSERT_OK(Put(Key(0), "some random string"));
ASSERT_EQ(0, compaction_finished);
ASSERT_EQ(0, flush_finished);
ASSERT_EQ("2", FilesPerLevel());
ASSERT_OK(dbfull()->WaitForCompact(wait_for_compact_options_));
if (flush_) {
ASSERT_EQ("1,2", FilesPerLevel());
ASSERT_EQ(1, compaction_finished);
ASSERT_EQ(1, flush_finished);
} else {
ASSERT_EQ(0, compaction_finished);
ASSERT_EQ(0, flush_finished);
ASSERT_EQ("2", FilesPerLevel());
}
compaction_finished = 0;
flush_finished = 0;
Close();
Reopen(options_);
ASSERT_EQ(0, flush_finished);
if (flush_) {
// if flushed already prior to close and reopen, expect there's no
// additional compaction needed
ASSERT_EQ(0, compaction_finished);
} else {
// if not flushed prior to close and reopen, expect L0 file creation from
// WAL when reopening which will trigger the compaction.
ASSERT_OK(dbfull()->WaitForCompact(wait_for_compact_options_));
ASSERT_EQ(1, compaction_finished);
}
ASSERT_EQ("1,2", FilesPerLevel());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
static std::string ShortKey(int i) {
assert(i < 10000);
char buf[100];

@ -29,35 +29,8 @@
namespace ROCKSDB_NAMESPACE {
Status DBImpl::FlushForGetLiveFiles() {
mutex_.AssertHeld();
// flush all dirty data to disk.
Status status;
if (immutable_db_options_.atomic_flush) {
mutex_.Unlock();
status = AtomicFlushMemTables(FlushOptions(), FlushReason::kGetLiveFiles);
if (status.IsColumnFamilyDropped()) {
status = Status::OK();
}
mutex_.Lock();
} else {
for (auto cfd : versions_->GetRefedColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions(), FlushReason::kGetLiveFiles);
TEST_SYNC_POINT("DBImpl::GetLiveFiles:1");
TEST_SYNC_POINT("DBImpl::GetLiveFiles:2");
mutex_.Lock();
if (!status.ok() && !status.IsColumnFamilyDropped()) {
break;
} else if (status.IsColumnFamilyDropped()) {
status = Status::OK();
}
}
}
return status;
return DBImpl::FlushAllColumnFamilies(FlushOptions(),
FlushReason::kGetLiveFiles);
}
Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,

@ -1410,6 +1410,9 @@ class DBImpl : public DB {
void NotifyOnExternalFileIngested(
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job);
Status FlushAllColumnFamilies(const FlushOptions& flush_options,
FlushReason flush_reason);
virtual Status FlushForGetLiveFiles();
void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;

@ -1808,6 +1808,37 @@ int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
->mutable_cf_options.level0_stop_writes_trigger;
}
Status DBImpl::FlushAllColumnFamilies(const FlushOptions& flush_options,
FlushReason flush_reason) {
mutex_.AssertHeld();
Status status;
if (immutable_db_options_.atomic_flush) {
mutex_.Unlock();
status = AtomicFlushMemTables(flush_options, flush_reason);
if (status.IsColumnFamilyDropped()) {
status = Status::OK();
}
mutex_.Lock();
} else {
for (auto cfd : versions_->GetRefedColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
mutex_.Unlock();
status = FlushMemTable(cfd, flush_options, flush_reason);
TEST_SYNC_POINT("DBImpl::FlushAllColumnFamilies:1");
TEST_SYNC_POINT("DBImpl::FlushAllColumnFamilies:2");
mutex_.Lock();
if (!status.ok() && !status.IsColumnFamilyDropped()) {
break;
} else if (status.IsColumnFamilyDropped()) {
status = Status::OK();
}
}
}
return status;
}
Status DBImpl::Flush(const FlushOptions& flush_options,
ColumnFamilyHandle* column_family) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
@ -3963,7 +3994,14 @@ void DBImpl::GetSnapshotContext(
Status DBImpl::WaitForCompact(
const WaitForCompactOptions& wait_for_compact_options) {
InstrumentedMutexLock l(&mutex_);
TEST_SYNC_POINT("DBImpl::WaitForCompact:Start");
if (wait_for_compact_options.flush) {
Status s = DBImpl::FlushAllColumnFamilies(FlushOptions(),
FlushReason::kManualFlush);
if (!s.ok()) {
return s;
}
}
TEST_SYNC_POINT("DBImpl::WaitForCompact:StartWaiting");
for (;;) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();

@ -352,6 +352,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
<< (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
}
TEST_SYNC_POINT("FlushJob::End");
return s;
}

@ -2087,6 +2087,9 @@ struct WaitForCompactOptions {
// Otherwise, jobs that were queued, but not scheduled yet may never finish
// and WaitForCompact() may wait indefinitely.
bool abort_on_pause = false;
// A boolean to flush all column families before starting to wait.
bool flush = false;
};
} // namespace ROCKSDB_NAMESPACE

@ -460,8 +460,8 @@ TEST_F(CheckpointTest, CheckpointCF) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"CheckpointTest::CheckpointCF:2", "DBImpl::GetLiveFiles:2"},
{"DBImpl::GetLiveFiles:1", "CheckpointTest::CheckpointCF:1"}});
{{"CheckpointTest::CheckpointCF:2", "DBImpl::FlushAllColumnFamilies:2"},
{"DBImpl::FlushAllColumnFamilies:1", "CheckpointTest::CheckpointCF:1"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
@ -985,4 +985,3 @@ int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

Loading…
Cancel
Save