Fix conflict between AddFile() and CompactRange()

Summary:
Fix the conflict bug between AddFile() and CompactRange() by
- Make sure that no AddFile calls are running when asking CompactionPicker to pick compaction for manual compaction
- If AddFile() run after we pick the compaction for the manual compaction it will be aware of it since we will add the manual compaction to running_compactions_ after picking it

This will solve these 2 scenarios
- If AddFile() is running, we will wait for it to finish before we pick a compaction for the manual compaction
- If we already picked a manual compaction and then AddFile() started ... we ensure that it never ingest a file in a level that will overlap with the manual compaction

Test Plan: unit tests

Reviewers: sdong

Reviewed By: sdong

Subscribers: andrewkr, yoshinorim, jkedgar, dhruba

Differential Revision: https://reviews.facebook.net/D64449
main
Islam AbdelRahman 8 years ago
parent eb44ed655a
commit 87dfc1d23e
  1. 13
      db/db_impl.cc
  2. 16
      db/db_impl.h
  3. 4
      db/db_impl_add_file.cc
  4. 67
      db/external_sst_file_test.cc

@ -346,7 +346,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
has_unpersisted_data_(false), has_unpersisted_data_(false),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
num_running_addfile_(0), num_running_addfile_(0),
addfile_cv_(&mutex_),
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
wal_manager_(immutable_db_options_, env_options_), wal_manager_(immutable_db_options_, env_options_),
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
@ -2033,7 +2032,6 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
int max_level_with_files = 0; int max_level_with_files = 0;
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
WaitForAddFile();
Version* base = cfd->current(); Version* base = cfd->current();
for (int level = 1; level < base->storage_info()->num_non_empty_levels(); for (int level = 1; level < base->storage_info()->num_non_empty_levels();
level++) { level++) {
@ -2746,6 +2744,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
manual.end = &end_storage; manual.end = &end_storage;
} }
TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
// When a manual compaction arrives, temporarily disable scheduling of // When a manual compaction arrives, temporarily disable scheduling of
@ -2813,6 +2813,10 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
ca->m = &manual; ca->m = &manual;
manual.incomplete = false; manual.incomplete = false;
bg_compaction_scheduled_++; bg_compaction_scheduled_++;
// manual.compaction will be added to running_compactions_ and erased
// inside BackgroundCompaction() but we need to put it now since we
// will unlock the mutex.
running_compactions_.insert(manual.compaction);
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
&DBImpl::UnscheduleCallback); &DBImpl::UnscheduleCallback);
scheduled = true; scheduled = true;
@ -3653,6 +3657,11 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
} }
bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) { bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
if (num_running_addfile_ > 0) {
// We need to wait for other AddFile() calls to finish
// before running a manual compaction.
return true;
}
if (m->exclusive) { if (m->exclusive) {
return (bg_compaction_scheduled_ > 0); return (bg_compaction_scheduled_ > 0);
} }

@ -655,11 +655,12 @@ class DBImpl : public DB {
// REQUIRES: mutex_ held // REQUIRES: mutex_ held
void WaitForAddFile(); void WaitForAddFile();
Status CompactFilesImpl( Status CompactFilesImpl(const CompactionOptions& compact_options,
const CompactionOptions& compact_options, ColumnFamilyData* cfd, ColumnFamilyData* cfd, Version* version,
Version* version, const std::vector<std::string>& input_file_names, const std::vector<std::string>& input_file_names,
const int output_level, int output_path_id, JobContext* job_context, const int output_level, int output_path_id,
LogBuffer* log_buffer); JobContext* job_context, LogBuffer* log_buffer);
Status ReadExternalSstFileInfo(ColumnFamilyHandle* column_family, Status ReadExternalSstFileInfo(ColumnFamilyHandle* column_family,
const std::string& file_path, const std::string& file_path,
ExternalSstFileInfo* file_info); ExternalSstFileInfo* file_info);
@ -737,6 +738,7 @@ class DBImpl : public DB {
// * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases // * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases
// (i.e. whenever a flush is done, even if it didn't make any progress) // (i.e. whenever a flush is done, even if it didn't make any progress)
// * whenever there is an error in background purge, flush or compaction // * whenever there is an error in background purge, flush or compaction
// * whenever num_running_addfile_ goes to 0.
InstrumentedCondVar bg_cv_; InstrumentedCondVar bg_cv_;
uint64_t logfile_number_; uint64_t logfile_number_;
std::deque<uint64_t> std::deque<uint64_t>
@ -986,10 +988,6 @@ class DBImpl : public DB {
// REQUIRES: mutex held // REQUIRES: mutex held
int num_running_addfile_; int num_running_addfile_;
// A condition variable that will be signaled whenever
// num_running_addfile_ goes to 0.
InstrumentedCondVar addfile_cv_;
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
WalManager wal_manager_; WalManager wal_manager_;
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -340,7 +340,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
num_running_addfile_--; num_running_addfile_--;
if (num_running_addfile_ == 0) { if (num_running_addfile_ == 0) {
addfile_cv_.SignalAll(); bg_cv_.SignalAll();
} }
TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock"); TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
} // mutex_ is unlocked here; } // mutex_ is unlocked here;
@ -426,7 +426,7 @@ int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd,
void DBImpl::WaitForAddFile() { void DBImpl::WaitForAddFile() {
mutex_.AssertHeld(); mutex_.AssertHeld();
while (num_running_addfile_ > 0) { while (num_running_addfile_ > 0) {
addfile_cv_.Wait(); bg_cv_.Wait();
} }
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -1011,10 +1011,13 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
// We have 2 overlapping files in L0 // We have 2 overlapping files in L0
EXPECT_EQ(FilesPerLevel(), "2"); EXPECT_EQ(FilesPerLevel(), "2");
rocksdb::SyncPoint::GetInstance()->LoadDependency({ rocksdb::SyncPoint::GetInstance()->LoadDependency(
{"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"}, {{"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"},
{"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"}, {"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"},
}); {"ExternalSSTFileTest::PickedLevelBug:2",
"DBImpl::RunManualCompaction:0"},
{"ExternalSSTFileTest::PickedLevelBug:3",
"DBImpl::RunManualCompaction:1"}});
std::atomic<bool> bg_compact_started(false); std::atomic<bool> bg_compact_started(false);
rocksdb::SyncPoint::GetInstance()->SetCallBack( rocksdb::SyncPoint::GetInstance()->SetCallBack(
@ -1023,6 +1026,12 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// While writing the MANIFEST start a thread that will ask for compaction
std::thread bg_compact([&]() {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
});
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2");
// Start a thread that will ingest a new file // Start a thread that will ingest a new file
std::thread bg_addfile([&]() { std::thread bg_addfile([&]() {
file_keys = {1, 2, 3}; file_keys = {1, 2, 3};
@ -1032,10 +1041,7 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
// Wait for AddFile to start picking levels and writing MANIFEST // Wait for AddFile to start picking levels and writing MANIFEST
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0"); TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0");
// While writing the MANIFEST start a thread that will ask for compaction TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:3");
std::thread bg_compact([&]() {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
});
// We need to verify that no compactions can run while AddFile is // We need to verify that no compactions can run while AddFile is
// ingesting the files into the levels it find suitable. So we will // ingesting the files into the levels it find suitable. So we will
@ -1065,6 +1071,51 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) {
Options options = CurrentOptions();
options.disable_auto_compactions = false;
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 2;
options.env = env_;
DestroyAndReopen(options);
std::function<void()> bg_compact = [&]() {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
};
int range_id = 0;
std::vector<int> file_keys;
std::function<void()> bg_addfile = [&]() {
ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id));
};
std::vector<std::thread> threads;
while (range_id < 5000) {
int range_start = (range_id * 20);
int range_end = range_start + 10;
file_keys.clear();
for (int k = range_start + 1; k < range_end; k++) {
file_keys.push_back(k);
}
ASSERT_OK(Put(Key(range_start), Key(range_start)));
ASSERT_OK(Put(Key(range_end), Key(range_end)));
ASSERT_OK(Flush());
if (range_id % 10 == 0) {
threads.emplace_back(bg_compact);
}
threads.emplace_back(bg_addfile);
for (auto& t : threads) {
t.join();
}
threads.clear();
range_id++;
}
}
TEST_F(ExternalSSTFileTest, PickedLevelDynamic) { TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.disable_auto_compactions = false; options.disable_auto_compactions = false;

Loading…
Cancel
Save