Running manual compactions in parallel with other automatic or manual compactions in restricted cases

Summary:
This diff provides a framework for doing manual
compactions in parallel with other compactions. We now have a deque of manual compactions. We also pass manual compactions as an argument from RunManualCompactions down to
BackgroundCompactions, so that RunManualCompactions can be reentrant.
Parallelism is controlled by the two routines
ConflictingManualCompaction to allow/disallow new parallel/manual
compactions based on already existing ManualCompactions. In this diff, by default manual compactions still have to run exclusive of other compactions. However, by setting the compaction option, exclusive_manual_compaction to false, it is possible to run other compactions in parallel with a manual compaction. However, we are still restricted to one manual compaction per column family at a time. All of these restrictions will be relaxed in future diffs.
I will be adding more tests later.

Test Plan: Rocksdb regression + new tests + valgrind

Reviewers: igor, anthony, IslamAbdelRahman, kradhakrishnan, yhchiang, sdong

Reviewed By: sdong

Subscribers: yoshinorim, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D47973
main
Venkatesh Radhakrishnan 9 years ago
parent d26a4ea621
commit 030215bf01
  1. 9
      db/column_family.cc
  2. 10
      db/column_family.h
  3. 687
      db/column_family_test.cc
  4. 47
      db/compaction_picker.cc
  5. 16
      db/compaction_picker.h
  6. 295
      db/db_compaction_test.cc
  7. 240
      db/db_impl.cc
  8. 39
      db/db_impl.h
  9. 2
      db/db_impl_debug.cc
  10. 52
      db/db_test.cc
  11. 25
      db/db_universal_compaction_test.cc
  12. 3
      hdfs/env_hdfs.h
  13. 7
      include/rocksdb/env.h
  14. 3
      include/rocksdb/options.h
  15. 17
      port/win/env_win.cc
  16. 7
      util/env_posix.cc
  17. 11
      util/thread_posix.cc
  18. 4
      util/thread_posix.h
  19. 6
      utilities/flashcache/flashcache.cc

@ -546,13 +546,12 @@ const int ColumnFamilyData::kCompactAllLevels = -1;
const int ColumnFamilyData::kCompactToBaseLevel = -2; const int ColumnFamilyData::kCompactToBaseLevel = -2;
Compaction* ColumnFamilyData::CompactRange( Compaction* ColumnFamilyData::CompactRange(
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options, int input_level,
int input_level, int output_level, uint32_t output_path_id, int output_level, uint32_t output_path_id, const InternalKey* begin,
const InternalKey* begin, const InternalKey* end, const InternalKey* end, InternalKey** compaction_end, bool* conflict) {
InternalKey** compaction_end) {
auto* result = compaction_picker_->CompactRange( auto* result = compaction_picker_->CompactRange(
GetName(), mutable_cf_options, current_->storage_info(), input_level, GetName(), mutable_cf_options, current_->storage_info(), input_level,
output_level, output_path_id, begin, end, compaction_end); output_level, output_path_id, begin, end, compaction_end, conflict);
if (result != nullptr) { if (result != nullptr) {
result->SetInputVersion(current_); result->SetInputVersion(current_);
} }

@ -249,11 +249,11 @@ class ColumnFamilyData {
// A flag to tell a manual compaction's output is base level. // A flag to tell a manual compaction's output is base level.
static const int kCompactToBaseLevel; static const int kCompactToBaseLevel;
// REQUIRES: DB mutex held // REQUIRES: DB mutex held
Compaction* CompactRange( Compaction* CompactRange(const MutableCFOptions& mutable_cf_options,
const MutableCFOptions& mutable_cf_options, int input_level, int output_level,
int input_level, int output_level, uint32_t output_path_id, uint32_t output_path_id, const InternalKey* begin,
const InternalKey* begin, const InternalKey* end, const InternalKey* end, InternalKey** compaction_end,
InternalKey** compaction_end); bool* manual_conflict);
CompactionPicker* compaction_picker() { return compaction_picker_.get(); } CompactionPicker* compaction_picker() { return compaction_picker_.get(); }
// thread-safe // thread-safe

@ -184,11 +184,17 @@ class ColumnFamilyTest : public testing::Test {
} }
} }
void PutRandomData(int cf, int num, int key_value_size) { void PutRandomData(int cf, int num, int key_value_size, bool save = false) {
for (int i = 0; i < num; ++i) { for (int i = 0; i < num; ++i) {
// 10 bytes for key, rest is value // 10 bytes for key, rest is value
ASSERT_OK(Put(cf, test::RandomKey(&rnd_, 10), if (!save) {
ASSERT_OK(Put(cf, test::RandomKey(&rnd_, 11),
RandomString(&rnd_, key_value_size - 10))); RandomString(&rnd_, key_value_size - 10)));
} else {
std::string key = test::RandomKey(&rnd_, 11);
keys_.insert(key);
ASSERT_OK(Put(cf, key, RandomString(&rnd_, key_value_size - 10)));
}
} }
} }
@ -378,6 +384,7 @@ class ColumnFamilyTest : public testing::Test {
std::vector<ColumnFamilyHandle*> handles_; std::vector<ColumnFamilyHandle*> handles_;
std::vector<std::string> names_; std::vector<std::string> names_;
std::set<std::string> keys_;
ColumnFamilyOptions column_family_options_; ColumnFamilyOptions column_family_options_;
DBOptions db_options_; DBOptions db_options_;
std::string dbname_; std::string dbname_;
@ -921,6 +928,682 @@ TEST_F(ColumnFamilyTest, DifferentCompactionStyles) {
Close(); Close();
} }
#ifndef ROCKSDB_LITE
// Sync points not supported in RocksDB Lite
TEST_F(ColumnFamilyTest, MultipleManualCompactions) {
Open();
CreateColumnFamilies({"one", "two"});
ColumnFamilyOptions default_cf, one, two;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.disableDataSync = true;
db_options_.max_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
default_cf.write_buffer_size = 64 << 10; // 64KB
default_cf.target_file_size_base = 30 << 10;
default_cf.source_compaction_factor = 100;
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
one.compaction_style = kCompactionStyleUniversal;
one.num_levels = 1;
// trigger compaction if there are >= 4 files
one.level0_file_num_compaction_trigger = 4;
one.write_buffer_size = 120000;
two.compaction_style = kCompactionStyleLevel;
two.num_levels = 4;
two.level0_file_num_compaction_trigger = 3;
two.write_buffer_size = 100000;
Reopen({default_cf, one, two});
// SETUP column family "one" -- universal style
for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
AssertFilesPerLevel(ToString(i + 1), 1);
}
bool cf_1_1 = true;
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"ColumnFamilyTest::MultiManual:4", "ColumnFamilyTest::MultiManual:1"},
{"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:5"},
{"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:3"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
if (cf_1_1) {
TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:4");
cf_1_1 = false;
TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:3");
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::vector<std::thread> threads;
threads.emplace_back([&] {
CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false;
ASSERT_OK(
db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
});
// SETUP column family "two" -- level style with 4 levels
for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) {
PutRandomData(2, 10, 12000);
PutRandomData(2, 1, 10);
WaitForFlush(2);
AssertFilesPerLevel(ToString(i + 1), 2);
}
threads.emplace_back([&] {
TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:1");
CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false;
ASSERT_OK(
db_->CompactRange(compact_options, handles_[2], nullptr, nullptr));
TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:2");
});
TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:5");
for (auto& t : threads) {
t.join();
}
// VERIFY compaction "one"
AssertFilesPerLevel("1", 1);
// VERIFY compaction "two"
AssertFilesPerLevel("0,1", 2);
CompactAll(2);
AssertFilesPerLevel("0,1", 2);
// Compare against saved keys
std::set<std::string>::iterator key_iter = keys_.begin();
while (key_iter != keys_.end()) {
ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
key_iter++;
}
Close();
}
TEST_F(ColumnFamilyTest, AutomaticAndManualCompactions) {
Open();
CreateColumnFamilies({"one", "two"});
ColumnFamilyOptions default_cf, one, two;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.disableDataSync = true;
db_options_.max_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
default_cf.write_buffer_size = 64 << 10; // 64KB
default_cf.target_file_size_base = 30 << 10;
default_cf.source_compaction_factor = 100;
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
one.compaction_style = kCompactionStyleUniversal;
one.num_levels = 1;
// trigger compaction if there are >= 4 files
one.level0_file_num_compaction_trigger = 4;
one.write_buffer_size = 120000;
two.compaction_style = kCompactionStyleLevel;
two.num_levels = 4;
two.level0_file_num_compaction_trigger = 3;
two.write_buffer_size = 100000;
Reopen({default_cf, one, two});
bool cf_1_1 = true;
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:1"},
{"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:5"},
{"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:3"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
if (cf_1_1) {
cf_1_1 = false;
TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4");
TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3");
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// SETUP column family "one" -- universal style
for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
AssertFilesPerLevel(ToString(i + 1), 1);
}
TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1");
// SETUP column family "two" -- level style with 4 levels
for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) {
PutRandomData(2, 10, 12000);
PutRandomData(2, 1, 10);
WaitForFlush(2);
AssertFilesPerLevel(ToString(i + 1), 2);
}
std::thread threads([&] {
CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false;
ASSERT_OK(
db_->CompactRange(compact_options, handles_[2], nullptr, nullptr));
TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2");
});
TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5");
threads.join();
// WAIT for compactions
WaitForCompaction();
// VERIFY compaction "one"
AssertFilesPerLevel("1", 1);
// VERIFY compaction "two"
AssertFilesPerLevel("0,1", 2);
CompactAll(2);
AssertFilesPerLevel("0,1", 2);
// Compare against saved keys
std::set<std::string>::iterator key_iter = keys_.begin();
while (key_iter != keys_.end()) {
ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
key_iter++;
}
Close();
}
TEST_F(ColumnFamilyTest, ManualAndAutomaticCompactions) {
Open();
CreateColumnFamilies({"one", "two"});
ColumnFamilyOptions default_cf, one, two;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.disableDataSync = true;
db_options_.max_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
default_cf.write_buffer_size = 64 << 10; // 64KB
default_cf.target_file_size_base = 30 << 10;
default_cf.source_compaction_factor = 100;
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
one.compaction_style = kCompactionStyleUniversal;
one.num_levels = 1;
// trigger compaction if there are >= 4 files
one.level0_file_num_compaction_trigger = 4;
one.write_buffer_size = 120000;
two.compaction_style = kCompactionStyleLevel;
two.num_levels = 4;
two.level0_file_num_compaction_trigger = 3;
two.write_buffer_size = 100000;
Reopen({default_cf, one, two});
// SETUP column family "one" -- universal style
for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
AssertFilesPerLevel(ToString(i + 1), 1);
}
bool cf_1_1 = true;
bool cf_1_2 = true;
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:1"},
{"ColumnFamilyTest::ManualAuto:5", "ColumnFamilyTest::ManualAuto:2"},
{"ColumnFamilyTest::ManualAuto:2", "ColumnFamilyTest::ManualAuto:3"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
if (cf_1_1) {
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
cf_1_1 = false;
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
} else if (cf_1_2) {
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
cf_1_2 = false;
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread threads([&] {
CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false;
ASSERT_OK(
db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
});
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");
// SETUP column family "two" -- level style with 4 levels
for (int i = 0; i < two.level0_file_num_compaction_trigger; ++i) {
PutRandomData(2, 10, 12000);
PutRandomData(2, 1, 10);
WaitForFlush(2);
AssertFilesPerLevel(ToString(i + 1), 2);
}
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");
threads.join();
// WAIT for compactions
WaitForCompaction();
// VERIFY compaction "one"
AssertFilesPerLevel("1", 1);
// VERIFY compaction "two"
AssertFilesPerLevel("0,1", 2);
CompactAll(2);
AssertFilesPerLevel("0,1", 2);
// Compare against saved keys
std::set<std::string>::iterator key_iter = keys_.begin();
while (key_iter != keys_.end()) {
ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
key_iter++;
}
Close();
}
TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactions) {
Open();
CreateColumnFamilies({"one"});
ColumnFamilyOptions default_cf, one;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.disableDataSync = true;
db_options_.max_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
default_cf.write_buffer_size = 64 << 10; // 64KB
default_cf.target_file_size_base = 30 << 10;
default_cf.source_compaction_factor = 100;
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
one.compaction_style = kCompactionStyleUniversal;
one.num_levels = 1;
// trigger compaction if there are >= 4 files
one.level0_file_num_compaction_trigger = 4;
one.write_buffer_size = 120000;
Reopen({default_cf, one});
// SETUP column family "one" -- universal style
for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
AssertFilesPerLevel(ToString(i + 1), 1);
}
bool cf_1_1 = true;
bool cf_1_2 = true;
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"},
{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"},
{"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:2"},
{"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
if (cf_1_1) {
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
cf_1_1 = false;
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
} else if (cf_1_2) {
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
cf_1_2 = false;
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread threads([&] {
CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false;
ASSERT_OK(
db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
});
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");
WaitForFlush(1);
// Add more L0 files and force automatic compaction
for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1);
}
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");
threads.join();
WaitForCompaction();
// VERIFY compaction "one"
ASSERT_LE(NumTableFilesAtLevel(0, 1), 2);
// Compare against saved keys
std::set<std::string>::iterator key_iter = keys_.begin();
while (key_iter != keys_.end()) {
ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
key_iter++;
}
Close();
}
TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactionsLevel) {
Open();
CreateColumnFamilies({"one"});
ColumnFamilyOptions default_cf, one;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.disableDataSync = true;
db_options_.max_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
default_cf.write_buffer_size = 64 << 10; // 64KB
default_cf.target_file_size_base = 30 << 10;
default_cf.source_compaction_factor = 100;
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
one.compaction_style = kCompactionStyleLevel;
one.num_levels = 1;
// trigger compaction if there are >= 4 files
one.level0_file_num_compaction_trigger = 4;
one.write_buffer_size = 120000;
Reopen({default_cf, one});
// SETUP column family "one" -- level style
for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
AssertFilesPerLevel(ToString(i + 1), 1);
}
bool cf_1_1 = true;
bool cf_1_2 = true;
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"},
{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"},
{"ColumnFamilyTest::ManualAuto:3", "ColumnFamilyTest::ManualAuto:2"},
{"LevelCompactionPicker::PickCompactionBySize:0",
"ColumnFamilyTest::ManualAuto:3"},
{"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
if (cf_1_1) {
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
cf_1_1 = false;
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
} else if (cf_1_2) {
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
cf_1_2 = false;
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread threads([&] {
CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false;
ASSERT_OK(
db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
});
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");
// Add more L0 files and force automatic compaction
for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1);
}
TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");
threads.join();
WaitForCompaction();
// VERIFY compaction "one"
AssertFilesPerLevel("0,1", 1);
// Compare against saved keys
std::set<std::string>::iterator key_iter = keys_.begin();
while (key_iter != keys_.end()) {
ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
key_iter++;
}
Close();
}
// This test checks for automatic getting a conflict if there is a
// manual which has not yet been scheduled.
// The manual compaction waits in NotScheduled
// We generate more files and then trigger an automatic compaction
// This will wait because there is an unscheduled manual compaction.
// Once the conflict is hit, the manual compaction starts and ends
// Then another automatic will start and end.
TEST_F(ColumnFamilyTest, SameCFManualAutomaticConflict) {
Open();
CreateColumnFamilies({"one"});
ColumnFamilyOptions default_cf, one;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.disableDataSync = true;
db_options_.max_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
default_cf.write_buffer_size = 64 << 10; // 64KB
default_cf.target_file_size_base = 30 << 10;
default_cf.source_compaction_factor = 100;
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
one.compaction_style = kCompactionStyleUniversal;
one.num_levels = 1;
// trigger compaction if there are >= 4 files
one.level0_file_num_compaction_trigger = 4;
one.write_buffer_size = 120000;
Reopen({default_cf, one});
// SETUP column family "one" -- universal style
for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
AssertFilesPerLevel(ToString(i + 1), 1);
}
bool cf_1_1 = true;
bool cf_1_2 = true;
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BackgroundCompaction()::Conflict",
"ColumnFamilyTest::ManualAutoCon:7"},
{"ColumnFamilyTest::ManualAutoCon:9",
"ColumnFamilyTest::ManualAutoCon:8"},
{"ColumnFamilyTest::ManualAutoCon:2",
"ColumnFamilyTest::ManualAutoCon:6"},
{"ColumnFamilyTest::ManualAutoCon:4",
"ColumnFamilyTest::ManualAutoCon:5"},
{"ColumnFamilyTest::ManualAutoCon:1",
"ColumnFamilyTest::ManualAutoCon:2"},
{"ColumnFamilyTest::ManualAutoCon:1",
"ColumnFamilyTest::ManualAutoCon:3"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
if (cf_1_1) {
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:4");
cf_1_1 = false;
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:3");
} else if (cf_1_2) {
cf_1_2 = false;
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:2");
}
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::RunManualCompaction:NotScheduled", [&](void* arg) {
InstrumentedMutex* mutex = static_cast<InstrumentedMutex*>(arg);
mutex->Unlock();
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:9");
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:7");
mutex->Lock();
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread threads([&] {
CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false;
ASSERT_OK(
db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:6");
});
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:8");
WaitForFlush(1);
// Add more L0 files and force automatic compaction
for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1);
}
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:5");
// Add more L0 files and force automatic compaction
for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
}
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:1");
threads.join();
WaitForCompaction();
// VERIFY compaction "one"
ASSERT_LE(NumTableFilesAtLevel(0, 1), 3);
// Compare against saved keys
std::set<std::string>::iterator key_iter = keys_.begin();
while (key_iter != keys_.end()) {
ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
key_iter++;
}
Close();
}
// In this test, we generate enough files to trigger automatic compactions.
// The automatic compaction waits in NonTrivial:AfterRun
// We generate more files and then trigger an automatic compaction
// This will wait because the automatic compaction has files it needs.
// Once the conflict is hit, the automatic compaction starts and ends
// Then the manual will run and end.
TEST_F(ColumnFamilyTest, SameCFAutomaticManualCompactions) {
Open();
CreateColumnFamilies({"one"});
ColumnFamilyOptions default_cf, one;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.disableDataSync = true;
db_options_.max_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
default_cf.write_buffer_size = 64 << 10; // 64KB
default_cf.target_file_size_base = 30 << 10;
default_cf.source_compaction_factor = 100;
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
one.compaction_style = kCompactionStyleUniversal;
one.num_levels = 1;
// trigger compaction if there are >= 4 files
one.level0_file_num_compaction_trigger = 4;
one.write_buffer_size = 120000;
Reopen({default_cf, one});
bool cf_1_1 = true;
bool cf_1_2 = true;
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:2"},
{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:5"},
{"CompactionPicker::CompactRange:Conflict",
"ColumnFamilyTest::AutoManual:3"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
if (cf_1_1) {
TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4");
cf_1_1 = false;
TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3");
} else if (cf_1_2) {
TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2");
cf_1_2 = false;
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// SETUP column family "one" -- universal style
for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
AssertFilesPerLevel(ToString(i + 1), 1);
}
TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5");
// Add another L0 file and force automatic compaction
for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
}
CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false;
ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1");
WaitForCompaction();
// VERIFY compaction "one"
AssertFilesPerLevel("1", 1);
// Compare against saved keys
std::set<std::string>::iterator key_iter = keys_.begin();
while (key_iter != keys_.end()) {
ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
key_iter++;
}
Close();
}
#endif // !ROCKSDB_LITE
#ifndef ROCKSDB_LITE // Tailing interator not supported #ifndef ROCKSDB_LITE // Tailing interator not supported
namespace { namespace {
std::string IterStatus(Iterator* iter) { std::string IterStatus(Iterator* iter) {

@ -455,7 +455,7 @@ Compaction* CompactionPicker::CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level, VersionStorageInfo* vstorage, int input_level, int output_level,
uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) { InternalKey** compaction_end, bool* manual_conflict) {
// CompactionPickerFIFO has its own implementation of compact range // CompactionPickerFIFO has its own implementation of compact range
assert(ioptions_.compaction_style != kCompactionStyleFIFO); assert(ioptions_.compaction_style != kCompactionStyleFIFO);
@ -481,6 +481,12 @@ Compaction* CompactionPicker::CompactRange(
return nullptr; return nullptr;
} }
if ((start_level == 0) && (!level0_compactions_in_progress_.empty())) {
*manual_conflict = true;
// Only one level 0 compaction allowed
return nullptr;
}
std::vector<CompactionInputFiles> inputs(vstorage->num_levels() - std::vector<CompactionInputFiles> inputs(vstorage->num_levels() -
start_level); start_level);
for (int level = start_level; level < vstorage->num_levels(); level++) { for (int level = start_level; level < vstorage->num_levels(); level++) {
@ -489,13 +495,21 @@ Compaction* CompactionPicker::CompactRange(
for (FileMetaData* f : vstorage->LevelFiles(level)) { for (FileMetaData* f : vstorage->LevelFiles(level)) {
files.push_back(f); files.push_back(f);
} }
if (FilesInCompaction(files)) {
*manual_conflict = true;
return nullptr;
} }
return new Compaction( }
Compaction* c = new Compaction(
vstorage, mutable_cf_options, std::move(inputs), output_level, vstorage, mutable_cf_options, std::move(inputs), output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level), mutable_cf_options.MaxFileSizeForLevel(output_level),
/* max_grandparent_overlap_bytes */ LLONG_MAX, output_path_id, /* max_grandparent_overlap_bytes */ LLONG_MAX, output_path_id,
GetCompressionType(ioptions_, output_level, 1), GetCompressionType(ioptions_, output_level, 1),
/* grandparents */ {}, /* is manual */ true); /* grandparents */ {}, /* is manual */ true);
if (start_level == 0) {
level0_compactions_in_progress_.insert(c);
}
return c;
} }
CompactionInputFiles inputs; CompactionInputFiles inputs;
@ -514,6 +528,13 @@ Compaction* CompactionPicker::CompactRange(
return nullptr; return nullptr;
} }
if ((input_level == 0) && (!level0_compactions_in_progress_.empty())) {
// Only one level 0 compaction allowed
TEST_SYNC_POINT("CompactionPicker::CompactRange:Conflict");
*manual_conflict = true;
return nullptr;
}
// Avoid compacting too much in one shot in case the range is large. // Avoid compacting too much in one shot in case the range is large.
// But we cannot do this for level-0 since level-0 files can overlap // But we cannot do this for level-0 since level-0 files can overlap
// and we must not pick one file and drop another older file if the // and we must not pick one file and drop another older file if the
@ -536,9 +557,10 @@ Compaction* CompactionPicker::CompactRange(
assert(output_path_id < static_cast<uint32_t>(ioptions_.db_paths.size())); assert(output_path_id < static_cast<uint32_t>(ioptions_.db_paths.size()));
if (ExpandWhileOverlapping(cf_name, vstorage, &inputs) == false) { if (ExpandWhileOverlapping(cf_name, vstorage, &inputs) == false) {
// manual compaction is currently single-threaded, so it should never // manual compaction is now multi-threaded, so it can
// happen that ExpandWhileOverlapping fails // happen that ExpandWhileOverlapping fails
assert(false); // we handle it higher in RunManualCompaction
*manual_conflict = true;
return nullptr; return nullptr;
} }
@ -557,9 +579,10 @@ Compaction* CompactionPicker::CompactRange(
int parent_index = -1; int parent_index = -1;
if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs, if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs,
&output_level_inputs, &parent_index, -1)) { &output_level_inputs, &parent_index, -1)) {
// manual compaction is currently single-threaded, so it should never // manual compaction is now multi-threaded, so it can
// happen that SetupOtherInputs fails // happen that SetupOtherInputs fails
assert(false); // we handle it higher in RunManualCompaction
*manual_conflict = true;
return nullptr; return nullptr;
} }
} }
@ -568,6 +591,12 @@ Compaction* CompactionPicker::CompactRange(
if (!output_level_inputs.empty()) { if (!output_level_inputs.empty()) {
compaction_inputs.push_back(output_level_inputs); compaction_inputs.push_back(output_level_inputs);
} }
for (size_t i = 0; i < compaction_inputs.size(); i++) {
if (FilesInCompaction(compaction_inputs[i].files)) {
*manual_conflict = true;
return nullptr;
}
}
std::vector<FileMetaData*> grandparents; std::vector<FileMetaData*> grandparents;
GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents); GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
@ -580,6 +609,9 @@ Compaction* CompactionPicker::CompactRange(
std::move(grandparents), /* is manual compaction */ true); std::move(grandparents), /* is manual compaction */ true);
TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
if (input_level == 0) {
level0_compactions_in_progress_.insert(compaction);
}
return compaction; return compaction;
} }
@ -1033,6 +1065,7 @@ bool LevelCompactionPicker::PickCompactionBySize(VersionStorageInfo* vstorage,
// could be made better by looking at key-ranges that are // could be made better by looking at key-ranges that are
// being compacted at level 0. // being compacted at level 0.
if (level == 0 && !level0_compactions_in_progress_.empty()) { if (level == 0 && !level0_compactions_in_progress_.empty()) {
TEST_SYNC_POINT("LevelCompactionPicker::PickCompactionBySize:0");
return false; return false;
} }
@ -1751,7 +1784,7 @@ Compaction* FIFOCompactionPicker::CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level, VersionStorageInfo* vstorage, int input_level, int output_level,
uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) { InternalKey** compaction_end, bool* manual_conflict) {
assert(input_level == 0); assert(input_level == 0);
assert(output_level == 0); assert(output_level == 0);
*compaction_end = nullptr; *compaction_end = nullptr;

@ -60,7 +60,7 @@ class CompactionPicker {
const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level, VersionStorageInfo* vstorage, int input_level, int output_level,
uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end); InternalKey** compaction_end, bool* manual_conflict);
// The maximum allowed output level. Default value is NumberLevels() - 1. // The maximum allowed output level. Default value is NumberLevels() - 1.
virtual int MaxOutputLevel() const { virtual int MaxOutputLevel() const {
@ -302,7 +302,7 @@ class FIFOCompactionPicker : public CompactionPicker {
const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level, VersionStorageInfo* vstorage, int input_level, int output_level,
uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) override; InternalKey** compaction_end, bool* manual_conflict) override;
// The maximum allowed output level. Always returns 0. // The maximum allowed output level. Always returns 0.
virtual int MaxOutputLevel() const override { virtual int MaxOutputLevel() const override {
@ -329,11 +329,13 @@ class NullCompactionPicker : public CompactionPicker {
} }
// Always return "nullptr" // Always return "nullptr"
Compaction* CompactRange( Compaction* CompactRange(const std::string& cf_name,
const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level, VersionStorageInfo* vstorage, int input_level,
uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, int output_level, uint32_t output_path_id,
InternalKey** compaction_end) override { const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end,
bool* manual_conflict) override {
return nullptr; return nullptr;
} }

@ -21,11 +21,13 @@ class DBCompactionTest : public DBTestBase {
DBCompactionTest() : DBTestBase("/db_compaction_test") {} DBCompactionTest() : DBTestBase("/db_compaction_test") {}
}; };
class DBCompactionTestWithParam : public DBTestBase, class DBCompactionTestWithParam
public testing::WithParamInterface<uint32_t> { : public DBTestBase,
public testing::WithParamInterface<std::tuple<uint32_t, bool>> {
public: public:
DBCompactionTestWithParam() : DBTestBase("/db_compaction_test") { DBCompactionTestWithParam() : DBTestBase("/db_compaction_test") {
max_subcompactions_ = GetParam(); max_subcompactions_ = std::get<0>(GetParam());
exclusive_manual_compaction_ = std::get<1>(GetParam());
} }
// Required if inheriting from testing::WithParamInterface<> // Required if inheriting from testing::WithParamInterface<>
@ -33,6 +35,7 @@ class DBCompactionTestWithParam : public DBTestBase,
static void TearDownTestCase() {} static void TearDownTestCase() {}
uint32_t max_subcompactions_; uint32_t max_subcompactions_;
bool exclusive_manual_compaction_;
}; };
namespace { namespace {
@ -617,8 +620,11 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) {
ASSERT_EQ(metadata.size(), 1U); ASSERT_EQ(metadata.size(), 1U);
LiveFileMetaData level0_file = metadata[0]; // L0 file meta LiveFileMetaData level0_file = metadata[0]; // L0 file meta
CompactRangeOptions cro;
cro.exclusive_manual_compaction = exclusive_manual_compaction_;
// Compaction will initiate a trivial move from L0 to L1 // Compaction will initiate a trivial move from L0 to L1
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); dbfull()->CompactRange(cro, nullptr, nullptr);
// File moved From L0 to L1 // File moved From L0 to L1
ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0
@ -682,9 +688,12 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) {
ASSERT_EQ(level0_files, ranges.size()); // Multiple files in L0 ASSERT_EQ(level0_files, ranges.size()); // Multiple files in L0
ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1
CompactRangeOptions cro;
cro.exclusive_manual_compaction = exclusive_manual_compaction_;
// Since data is non-overlapping we expect compaction to initiate // Since data is non-overlapping we expect compaction to initiate
// a trivial move // a trivial move
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); db_->CompactRange(cro, nullptr, nullptr);
// We expect that all the files were trivially moved from L0 to L1 // We expect that all the files were trivially moved from L0 to L1
ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files); ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files);
@ -721,7 +730,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) {
ASSERT_OK(Flush()); ASSERT_OK(Flush());
} }
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); db_->CompactRange(cro, nullptr, nullptr);
for (uint32_t i = 0; i < ranges.size(); i++) { for (uint32_t i = 0; i < ranges.size(); i++) {
for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
@ -777,6 +786,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.change_level = true; compact_options.change_level = true;
compact_options.target_level = 6; compact_options.target_level = 6;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
// 2 files in L6 // 2 files in L6
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0)); ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0));
@ -792,6 +802,263 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) {
} }
} }
TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
int32_t trivial_move = 0;
int32_t non_trivial_move = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:TrivialMove",
[&](void* arg) { trivial_move++; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial",
[&](void* arg) { non_trivial_move++; });
bool first = true;
bool second = true;
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBCompaction::ManualPartial:4", "DBCompaction::ManualPartial:1"},
{"DBCompaction::ManualPartial:2", "DBCompaction::ManualPartial:3"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
if (first) {
TEST_SYNC_POINT("DBCompaction::ManualPartial:4");
first = false;
TEST_SYNC_POINT("DBCompaction::ManualPartial:3");
} else if (second) {
TEST_SYNC_POINT("DBCompaction::ManualPartial:2");
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.write_buffer_size = 10 * 1024 * 1024;
options.num_levels = 7;
options.max_subcompactions = max_subcompactions_;
options.level0_file_num_compaction_trigger = 3;
options.max_background_compactions = 3;
options.target_file_size_base = 1 << 23; // 8 MB
DestroyAndReopen(options);
int32_t value_size = 10 * 1024; // 10 KB
// Add 2 non-overlapping files
Random rnd(301);
std::map<int32_t, std::string> values;
// file 1 [0 => 100]
for (int32_t i = 0; i < 100; i++) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
ASSERT_OK(Flush());
// file 2 [100 => 300]
for (int32_t i = 100; i < 300; i++) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
ASSERT_OK(Flush());
// 2 files in L0
ASSERT_EQ("2", FilesPerLevel(0));
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 6;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
// Trivial move the two non-overlapping files to level 6
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
// 2 files in L6
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0));
ASSERT_EQ(trivial_move, 1);
ASSERT_EQ(non_trivial_move, 0);
// file 3 [ 0 => 200]
for (int32_t i = 0; i < 200; i++) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
ASSERT_OK(Flush());
// 1 files in L0
ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel(0));
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false));
ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr, false));
ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr, nullptr, false));
ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr, false));
ASSERT_OK(dbfull()->TEST_CompactRange(4, nullptr, nullptr, nullptr, false));
// 2 files in L6, 1 file in L5
ASSERT_EQ("0,0,0,0,0,1,2", FilesPerLevel(0));
ASSERT_EQ(trivial_move, 6);
ASSERT_EQ(non_trivial_move, 0);
std::thread threads([&] {
compact_options.change_level = false;
compact_options.exclusive_manual_compaction = false;
std::string begin_string = Key(0);
std::string end_string = Key(199);
Slice begin(begin_string);
Slice end(end_string);
ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
});
TEST_SYNC_POINT("DBCompaction::ManualPartial:1");
// file 4 [300 => 400)
for (int32_t i = 300; i <= 400; i++) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
ASSERT_OK(Flush());
// file 5 [400 => 500)
for (int32_t i = 400; i <= 500; i++) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
ASSERT_OK(Flush());
// file 6 [500 => 600)
for (int32_t i = 500; i <= 600; i++) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
ASSERT_OK(Flush());
// 3 files in L0
ASSERT_EQ("3,0,0,0,0,1,2", FilesPerLevel(0));
// 1 file in L6, 1 file in L1
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel(0));
threads.join();
for (int32_t i = 0; i < 600; i++) {
ASSERT_EQ(Get(Key(i)), values[i]);
}
}
TEST_F(DBCompactionTest, ManualPartialFill) {
int32_t trivial_move = 0;
int32_t non_trivial_move = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:TrivialMove",
[&](void* arg) { trivial_move++; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial",
[&](void* arg) { non_trivial_move++; });
bool first = true;
bool second = true;
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBCompaction::PartialFill:4", "DBCompaction::PartialFill:1"},
{"DBCompaction::PartialFill:2", "DBCompaction::PartialFill:3"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
if (first) {
TEST_SYNC_POINT("DBCompaction::PartialFill:4");
first = false;
TEST_SYNC_POINT("DBCompaction::PartialFill:3");
} else if (second) {
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.write_buffer_size = 10 * 1024 * 1024;
options.max_bytes_for_level_multiplier = 2;
options.num_levels = 4;
options.level0_file_num_compaction_trigger = 3;
options.max_background_compactions = 3;
DestroyAndReopen(options);
int32_t value_size = 10 * 1024; // 10 KB
// Add 2 non-overlapping files
Random rnd(301);
std::map<int32_t, std::string> values;
// file 1 [0 => 100]
for (int32_t i = 0; i < 100; i++) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
ASSERT_OK(Flush());
// file 2 [100 => 300]
for (int32_t i = 100; i < 300; i++) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
ASSERT_OK(Flush());
// 2 files in L0
ASSERT_EQ("2", FilesPerLevel(0));
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 2;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
// 2 files in L2
ASSERT_EQ("0,0,2", FilesPerLevel(0));
ASSERT_EQ(trivial_move, 1);
ASSERT_EQ(non_trivial_move, 0);
// file 3 [ 0 => 200]
for (int32_t i = 0; i < 200; i++) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
ASSERT_OK(Flush());
// 2 files in L2, 1 in L0
ASSERT_EQ("1,0,2", FilesPerLevel(0));
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false));
// 2 files in L2, 1 in L1
ASSERT_EQ("0,1,2", FilesPerLevel(0));
ASSERT_EQ(trivial_move, 2);
ASSERT_EQ(non_trivial_move, 0);
std::thread threads([&] {
compact_options.change_level = false;
compact_options.exclusive_manual_compaction = false;
std::string begin_string = Key(0);
std::string end_string = Key(199);
Slice begin(begin_string);
Slice end(end_string);
ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
});
TEST_SYNC_POINT("DBCompaction::PartialFill:1");
// Many files 4 [300 => 4300)
for (int32_t i = 0; i <= 5; i++) {
for (int32_t j = 300; j < 4300; j++) {
if (j == 2300) {
ASSERT_OK(Flush());
dbfull()->TEST_WaitForFlushMemTable();
}
values[j] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(j), values[j]));
}
}
// Verify level sizes
uint64_t target_size = 4 * options.max_bytes_for_level_base;
for (int32_t i = 1; i < options.num_levels; i++) {
ASSERT_LE(SizeAtLevel(i), target_size);
target_size *= options.max_bytes_for_level_multiplier;
}
TEST_SYNC_POINT("DBCompaction::PartialFill:2");
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
threads.join();
for (int32_t i = 0; i < 4300; i++) {
ASSERT_EQ(Get(Key(i)), values[i]);
}
}
TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) { TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) {
int32_t trivial_move = 0; int32_t trivial_move = 0;
int32_t non_trivial_move = 0; int32_t non_trivial_move = 0;
@ -825,6 +1092,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.change_level = true; compact_options.change_level = true;
compact_options.target_level = 3; compact_options.target_level = 3;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
ASSERT_EQ("0,0,0,1", FilesPerLevel(0)); ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
ASSERT_EQ(trivial_move, 1); ASSERT_EQ(trivial_move, 1);
@ -838,8 +1106,10 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) {
ASSERT_OK(Flush()); ASSERT_OK(Flush());
ASSERT_EQ("1,0,0,1", FilesPerLevel(0)); ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
CompactRangeOptions cro;
cro.exclusive_manual_compaction = exclusive_manual_compaction_;
// Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves) // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_EQ("0,0,0,2", FilesPerLevel(0)); ASSERT_EQ("0,0,0,2", FilesPerLevel(0));
ASSERT_EQ(trivial_move, 4); ASSERT_EQ(trivial_move, 4);
ASSERT_EQ(non_trivial_move, 0); ASSERT_EQ(non_trivial_move, 0);
@ -1143,6 +1413,7 @@ TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) {
compact_options.target_level = 0; compact_options.target_level = 0;
compact_options.bottommost_level_compaction = compact_options.bottommost_level_compaction =
BottommostLevelCompaction::kForce; BottommostLevelCompaction::kForce;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr); dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr);
// Only 1 file in L0 // Only 1 file in L0
@ -1276,7 +1547,9 @@ TEST_P(DBCompactionTestWithParam, ManualCompaction) {
uint64_t prev_block_cache_add = uint64_t prev_block_cache_add =
options.statistics->getTickerCount(BLOCK_CACHE_ADD); options.statistics->getTickerCount(BLOCK_CACHE_ADD);
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); CompactRangeOptions cro;
cro.exclusive_manual_compaction = exclusive_manual_compaction_;
db_->CompactRange(cro, handles_[1], nullptr, nullptr);
// Verify manual compaction doesn't fill block cache // Verify manual compaction doesn't fill block cache
ASSERT_EQ(prev_block_cache_add, ASSERT_EQ(prev_block_cache_add,
options.statistics->getTickerCount(BLOCK_CACHE_ADD)); options.statistics->getTickerCount(BLOCK_CACHE_ADD));
@ -1355,6 +1628,7 @@ TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) {
ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.target_path_id = 1; compact_options.target_path_id = 1;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
ASSERT_EQ("0,1", FilesPerLevel(1)); ASSERT_EQ("0,1", FilesPerLevel(1));
@ -1867,7 +2141,10 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
} }
INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam, INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
::testing::Values(1, 4)); ::testing::Values(std::make_tuple(1, true),
std::make_tuple(1, false),
std::make_tuple(4, true),
std::make_tuple(4, false)));
class CompactionPriTest : public DBTestBase, class CompactionPriTest : public DBTestBase,
public testing::WithParamInterface<uint32_t> { public testing::WithParamInterface<uint32_t> {

@ -253,10 +253,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
unscheduled_compactions_(0), unscheduled_compactions_(0),
bg_compaction_scheduled_(0), bg_compaction_scheduled_(0),
num_running_compactions_(0), num_running_compactions_(0),
bg_manual_only_(0),
bg_flush_scheduled_(0), bg_flush_scheduled_(0),
num_running_flushes_(0), num_running_flushes_(0),
manual_compaction_(nullptr),
disable_delete_obsolete_files_(0), disable_delete_obsolete_files_(0),
delete_obsolete_files_next_run_( delete_obsolete_files_next_run_(
options.env->NowMicros() + options.env->NowMicros() +
@ -1561,6 +1559,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd(); auto cfd = cfh->cfd();
bool exclusive = options.exclusive_manual_compaction;
Status s = FlushMemTable(cfd, FlushOptions()); Status s = FlushMemTable(cfd, FlushOptions());
if (!s.ok()) { if (!s.ok()) {
@ -1586,7 +1585,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
// Always compact all files together. // Always compact all files together.
s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
cfd->NumberLevels() - 1, options.target_path_id, cfd->NumberLevels() - 1, options.target_path_id,
begin, end); begin, end, exclusive);
final_output_level = cfd->NumberLevels() - 1; final_output_level = cfd->NumberLevels() - 1;
} else { } else {
for (int level = 0; level <= max_level_with_files; level++) { for (int level = 0; level <= max_level_with_files; level++) {
@ -1621,7 +1620,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
} }
} }
s = RunManualCompaction(cfd, level, output_level, options.target_path_id, s = RunManualCompaction(cfd, level, output_level, options.target_path_id,
begin, end); begin, end, exclusive);
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
@ -2205,12 +2204,15 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const {
Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
int output_level, uint32_t output_path_id, int output_level, uint32_t output_path_id,
const Slice* begin, const Slice* end, const Slice* begin, const Slice* end,
bool disallow_trivial_move) { bool exclusive, bool disallow_trivial_move) {
assert(input_level == ColumnFamilyData::kCompactAllLevels || assert(input_level == ColumnFamilyData::kCompactAllLevels ||
input_level >= 0); input_level >= 0);
InternalKey begin_storage, end_storage; InternalKey begin_storage, end_storage;
CompactionArg* ca;
bool scheduled = false;
bool manual_conflict = false;
ManualCompaction manual; ManualCompaction manual;
manual.cfd = cfd; manual.cfd = cfd;
manual.input_level = input_level; manual.input_level = input_level;
@ -2218,6 +2220,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
manual.output_path_id = output_path_id; manual.output_path_id = output_path_id;
manual.done = false; manual.done = false;
manual.in_progress = false; manual.in_progress = false;
manual.incomplete = false;
manual.exclusive = exclusive;
manual.disallow_trivial_move = disallow_trivial_move; manual.disallow_trivial_move = disallow_trivial_move;
// For universal compaction, we enforce every manual compaction to compact // For universal compaction, we enforce every manual compaction to compact
// all files. // all files.
@ -2245,7 +2249,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
// jobs drops to zero. This is needed to ensure that this manual compaction // jobs drops to zero. This is needed to ensure that this manual compaction
// can compact any range of keys/files. // can compact any range of keys/files.
// //
// bg_manual_only_ is non-zero when at least one thread is inside // HasPendingManualCompaction() is true when at least one thread is inside
// RunManualCompaction(), i.e. during that time no other compaction will // RunManualCompaction(), i.e. during that time no other compaction will
// get scheduled (see MaybeScheduleFlushOrCompaction). // get scheduled (see MaybeScheduleFlushOrCompaction).
// //
@ -2254,7 +2258,9 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
// However, only one of them will actually schedule compaction, while // However, only one of them will actually schedule compaction, while
// others will wait on a condition variable until it completes. // others will wait on a condition variable until it completes.
++bg_manual_only_; AddManualCompaction(&manual);
TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
if (exclusive) {
while (bg_compaction_scheduled_ > 0) { while (bg_compaction_scheduled_ > 0) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] Manual compaction waiting for all other scheduled background " "[%s] Manual compaction waiting for all other scheduled background "
@ -2262,6 +2268,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
cfd->GetName().c_str()); cfd->GetName().c_str());
bg_cv_.Wait(); bg_cv_.Wait();
} }
}
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] Manual compaction starting", "[%s] Manual compaction starting",
@ -2271,20 +2278,47 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
// the compaction will set manual.status to bg_error_ and set manual.done to // the compaction will set manual.status to bg_error_ and set manual.done to
// true. // true.
while (!manual.done) { while (!manual.done) {
assert(bg_manual_only_ > 0); assert(HasPendingManualCompaction());
if (manual_compaction_ != nullptr) { manual_conflict = false;
if (ShouldRunManualCompaction(&manual) || (manual.in_progress == true) ||
scheduled ||
((manual.manual_end = &manual.tmp_storage1)&&(
(manual.compaction = manual.cfd->CompactRange(
*manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
manual.output_level, manual.output_path_id, manual.begin,
manual.end, &manual.manual_end, &manual_conflict)) ==
nullptr) &&
manual_conflict)) {
// exclusive manual compactions should not see a conflict during
// CompactRange
assert(!exclusive || !manual_conflict);
// Running either this or some other manual compaction // Running either this or some other manual compaction
bg_cv_.Wait(); bg_cv_.Wait();
} else { if (scheduled && manual.incomplete == true) {
manual_compaction_ = &manual; assert(!manual.in_progress);
scheduled = false;
manual.incomplete = false;
}
} else if (!scheduled) {
if (manual.compaction == nullptr) {
manual.done = true;
bg_cv_.SignalAll();
continue;
}
ca = new CompactionArg;
ca->db = this;
ca->m = &manual;
manual.incomplete = false;
bg_compaction_scheduled_++; bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW, this); env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
&DBImpl::UnscheduleCallback);
scheduled = true;
} }
} }
assert(!manual.in_progress); assert(!manual.in_progress);
assert(bg_manual_only_ > 0); assert(HasPendingManualCompaction());
--bg_manual_only_; RemoveManualCompaction(&manual);
return manual.status; return manual.status;
} }
@ -2406,7 +2440,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
} }
} }
if (bg_manual_only_) { if (HasExclusiveManualCompaction()) {
// only manual compactions are allowed to run. don't schedule automatic // only manual compactions are allowed to run. don't schedule automatic
// compactions // compactions
return; return;
@ -2414,9 +2448,13 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
while (bg_compaction_scheduled_ < db_options_.max_background_compactions && while (bg_compaction_scheduled_ < db_options_.max_background_compactions &&
unscheduled_compactions_ > 0) { unscheduled_compactions_ > 0) {
CompactionArg* ca = new CompactionArg;
ca->db = this;
ca->m = nullptr;
bg_compaction_scheduled_++; bg_compaction_scheduled_++;
unscheduled_compactions_--; unscheduled_compactions_--;
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW, this); env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
&DBImpl::UnscheduleCallback);
} }
} }
@ -2478,10 +2516,21 @@ void DBImpl::BGWorkFlush(void* db) {
TEST_SYNC_POINT("DBImpl::BGWorkFlush:done"); TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
} }
void DBImpl::BGWorkCompaction(void* db) { void DBImpl::BGWorkCompaction(void* arg) {
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
delete reinterpret_cast<CompactionArg*>(arg);
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW); IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
TEST_SYNC_POINT("DBImpl::BGWorkCompaction"); TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
reinterpret_cast<DBImpl*>(db)->BackgroundCallCompaction(); reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(ca.m);
}
void DBImpl::UnscheduleCallback(void* arg) {
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
delete reinterpret_cast<CompactionArg*>(arg);
if ((ca.m != nullptr) && (ca.m->compaction != nullptr)) {
delete ca.m->compaction;
}
TEST_SYNC_POINT("DBImpl::UnscheduleCallback");
} }
Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
@ -2602,8 +2651,9 @@ void DBImpl::BackgroundCallFlush() {
} }
} }
void DBImpl::BackgroundCallCompaction() { void DBImpl::BackgroundCallCompaction(void* arg) {
bool made_progress = false; bool made_progress = false;
ManualCompaction* m = reinterpret_cast<ManualCompaction*>(arg);
JobContext job_context(next_job_id_.fetch_add(1), true); JobContext job_context(next_job_id_.fetch_add(1), true);
TEST_SYNC_POINT("BackgroundCallCompaction:0"); TEST_SYNC_POINT("BackgroundCallCompaction:0");
MaybeDumpStats(); MaybeDumpStats();
@ -2616,7 +2666,8 @@ void DBImpl::BackgroundCallCompaction() {
CaptureCurrentFileNumberInPendingOutputs(); CaptureCurrentFileNumberInPendingOutputs();
assert(bg_compaction_scheduled_); assert(bg_compaction_scheduled_);
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer); Status s =
BackgroundCompaction(&made_progress, &job_context, &log_buffer, m);
TEST_SYNC_POINT("BackgroundCallCompaction:1"); TEST_SYNC_POINT("BackgroundCallCompaction:1");
if (!s.ok() && !s.IsShutdownInProgress()) { if (!s.ok() && !s.IsShutdownInProgress()) {
// Wait a little bit before retrying background compaction in // Wait a little bit before retrying background compaction in
@ -2668,11 +2719,12 @@ void DBImpl::BackgroundCallCompaction() {
// See if there's more work to be done // See if there's more work to be done
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();
if (made_progress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) { if (made_progress || bg_compaction_scheduled_ == 0 ||
HasPendingManualCompaction()) {
// signal if // signal if
// * made_progress -- need to wakeup DelayWrite // * made_progress -- need to wakeup DelayWrite
// * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl // * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
// * bg_manual_only_ > 0 -- need to wakeup RunManualCompaction // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
// If none of this is true, there is no need to signal since nobody is // If none of this is true, there is no need to signal since nobody is
// waiting for it // waiting for it
bg_cv_.SignalAll(); bg_cv_.SignalAll();
@ -2686,14 +2738,17 @@ void DBImpl::BackgroundCallCompaction() {
Status DBImpl::BackgroundCompaction(bool* made_progress, Status DBImpl::BackgroundCompaction(bool* made_progress,
JobContext* job_context, JobContext* job_context,
LogBuffer* log_buffer) { LogBuffer* log_buffer, void* arg) {
ManualCompaction* manual_compaction =
reinterpret_cast<ManualCompaction*>(arg);
*made_progress = false; *made_progress = false;
mutex_.AssertHeld(); mutex_.AssertHeld();
bool is_manual = (manual_compaction_ != nullptr) && bool is_manual = (manual_compaction != nullptr);
(manual_compaction_->in_progress == false);
bool trivial_move_disallowed = is_manual && // (manual_compaction->in_progress == false);
manual_compaction_->disallow_trivial_move; bool trivial_move_disallowed =
is_manual && manual_compaction->disallow_trivial_move;
CompactionJobStats compaction_job_stats; CompactionJobStats compaction_job_stats;
Status status = bg_error_; Status status = bg_error_;
@ -2703,34 +2758,30 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (!status.ok()) { if (!status.ok()) {
if (is_manual) { if (is_manual) {
manual_compaction_->status = status; manual_compaction->status = status;
manual_compaction_->done = true; manual_compaction->done = true;
manual_compaction_->in_progress = false; manual_compaction->in_progress = false;
manual_compaction_ = nullptr; delete manual_compaction->compaction;
manual_compaction = nullptr;
} }
return status; return status;
} }
if (is_manual) { if (is_manual) {
// another thread cannot pick up the same work // another thread cannot pick up the same work
manual_compaction_->in_progress = true; manual_compaction->in_progress = true;
} else if (manual_compaction_ != nullptr) {
// there should be no automatic compactions running when manual compaction
// is running
return Status::OK();
} }
unique_ptr<Compaction> c; unique_ptr<Compaction> c;
InternalKey manual_end_storage; // InternalKey manual_end_storage;
InternalKey* manual_end = &manual_end_storage; // InternalKey* manual_end = &manual_end_storage;
if (is_manual) { if (is_manual) {
ManualCompaction* m = manual_compaction_; ManualCompaction* m = manual_compaction;
assert(m->in_progress); assert(m->in_progress);
c.reset(m->cfd->CompactRange( c.reset(std::move(m->compaction));
*m->cfd->GetLatestMutableCFOptions(), m->input_level, m->output_level,
m->output_path_id, m->begin, m->end, &manual_end));
if (!c) { if (!c) {
m->done = true; m->done = true;
m->manual_end = nullptr;
LogToBuffer(log_buffer, LogToBuffer(log_buffer,
"[%s] Manual compaction from level-%d from %s .. " "[%s] Manual compaction from level-%d from %s .. "
"%s; nothing to do\n", "%s; nothing to do\n",
@ -2744,9 +2795,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
m->cfd->GetName().c_str(), m->input_level, c->output_level(), m->cfd->GetName().c_str(), m->input_level, c->output_level(),
(m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"), (m->end ? m->end->DebugString().c_str() : "(end)"),
((m->done || manual_end == nullptr) ((m->done || m->manual_end == nullptr)
? "(end)" ? "(end)"
: manual_end->DebugString().c_str())); : m->manual_end->DebugString().c_str()));
} }
} else if (!compaction_queue_.empty()) { } else if (!compaction_queue_.empty()) {
// cfd is referenced here // cfd is referenced here
@ -2763,6 +2814,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
return Status::OK(); return Status::OK();
} }
if (HaveManualCompaction(cfd)) {
// Can't compact right now, but try again later
TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
return Status::OK();
}
// Pick up latest mutable CF Options and use it throughout the // Pick up latest mutable CF Options and use it throughout the
// compaction job // compaction job
// Compaction makes a copy of the latest MutableCFOptions. It should be used // Compaction makes a copy of the latest MutableCFOptions. It should be used
@ -2940,7 +2997,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
} }
if (is_manual) { if (is_manual) {
ManualCompaction* m = manual_compaction_; ManualCompaction* m = manual_compaction;
if (!status.ok()) { if (!status.ok()) {
m->status = status; m->status = status;
m->done = true; m->done = true;
@ -2958,7 +3015,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// Stop the compaction if manual_end points to nullptr -- this means // Stop the compaction if manual_end points to nullptr -- this means
// that we compacted the whole range. manual_end should always point // that we compacted the whole range. manual_end should always point
// to nullptr in case of universal compaction // to nullptr in case of universal compaction
if (manual_end == nullptr) { if (m->manual_end == nullptr) {
m->done = true; m->done = true;
} }
if (!m->done) { if (!m->done) {
@ -2969,15 +3026,102 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
kCompactionStyleUniversal || kCompactionStyleUniversal ||
m->cfd->ioptions()->num_levels > 1); m->cfd->ioptions()->num_levels > 1);
assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO); assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
m->tmp_storage = *manual_end; m->tmp_storage = *m->manual_end;
m->begin = &m->tmp_storage; m->begin = &m->tmp_storage;
m->incomplete = true;
} }
m->in_progress = false; // not being processed anymore m->in_progress = false; // not being processed anymore
manual_compaction_ = nullptr;
} }
return status; return status;
} }
bool DBImpl::HasPendingManualCompaction() {
return (!manual_compaction_dequeue_.empty());
}
void DBImpl::AddManualCompaction(DBImpl::ManualCompaction* m) {
manual_compaction_dequeue_.push_back(m);
}
void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
// Remove from queue
std::deque<ManualCompaction*>::iterator it =
manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) {
if (m == (*it)) {
it = manual_compaction_dequeue_.erase(it);
return;
}
it++;
}
assert(false);
return;
}
bool DBImpl::ShouldRunManualCompaction(ManualCompaction* m) {
if ((m->exclusive) && (bg_compaction_scheduled_ > 0)) {
return true;
}
std::deque<ManualCompaction*>::iterator it =
manual_compaction_dequeue_.begin();
bool seen = false;
while (it != manual_compaction_dequeue_.end()) {
if (m == (*it)) {
it++;
seen = true;
continue;
} else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
// Consider the other manual compaction *it, conflicts if:
// overlaps with m
// and (*it) is ahead in the queue and is not yet in progress
return true;
}
it++;
}
return false;
}
bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
// Remove from priority queue
std::deque<ManualCompaction*>::iterator it =
manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) {
if ((*it)->exclusive) {
return true;
}
if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
// Allow automatic compaction if manual compaction is
// is in progress
return true;
}
it++;
}
return false;
}
bool DBImpl::HasExclusiveManualCompaction() {
// Remove from priority queue
std::deque<ManualCompaction*>::iterator it =
manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) {
if ((*it)->exclusive) {
return true;
}
it++;
}
return false;
}
bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) {
if ((m->exclusive) || (m1->exclusive)) {
return true;
}
if (m->cfd != m1->cfd) {
return false;
}
return true;
}
namespace { namespace {
struct IterState { struct IterState {
IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version) IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version)

@ -275,6 +275,7 @@ class DBImpl : public DB {
Status RunManualCompaction(ColumnFamilyData* cfd, int input_level, Status RunManualCompaction(ColumnFamilyData* cfd, int input_level,
int output_level, uint32_t output_path_id, int output_level, uint32_t output_path_id,
const Slice* begin, const Slice* end, const Slice* begin, const Slice* end,
bool exclusive,
bool disallow_trivial_move = false); bool disallow_trivial_move = false);
// Return an internal iterator over the current state of the database. // Return an internal iterator over the current state of the database.
@ -554,12 +555,13 @@ class DBImpl : public DB {
void MaybeScheduleFlushOrCompaction(); void MaybeScheduleFlushOrCompaction();
void SchedulePendingFlush(ColumnFamilyData* cfd); void SchedulePendingFlush(ColumnFamilyData* cfd);
void SchedulePendingCompaction(ColumnFamilyData* cfd); void SchedulePendingCompaction(ColumnFamilyData* cfd);
static void BGWorkCompaction(void* db); static void BGWorkCompaction(void* arg);
static void BGWorkFlush(void* db); static void BGWorkFlush(void* db);
void BackgroundCallCompaction(); static void UnscheduleCallback(void* arg);
void BackgroundCallCompaction(void* arg);
void BackgroundCallFlush(); void BackgroundCallFlush();
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context, Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer); LogBuffer* log_buffer, void* m = 0);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context, Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer); LogBuffer* log_buffer);
@ -605,7 +607,7 @@ class DBImpl : public DB {
std::atomic<bool> shutting_down_; std::atomic<bool> shutting_down_;
// This condition variable is signaled on these conditions: // This condition variable is signaled on these conditions:
// * whenever bg_compaction_scheduled_ goes down to 0 // * whenever bg_compaction_scheduled_ goes down to 0
// * if bg_manual_only_ > 0, whenever a compaction finishes, even if it hasn't // * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't
// made any progress // made any progress
// * whenever a compaction made any progress // * whenever a compaction made any progress
// * whenever bg_flush_scheduled_ value decreases (i.e. whenever a flush is // * whenever bg_flush_scheduled_ value decreases (i.e. whenever a flush is
@ -763,11 +765,6 @@ class DBImpl : public DB {
// stores the number of compactions are currently running // stores the number of compactions are currently running
int num_running_compactions_; int num_running_compactions_;
// If non-zero, MaybeScheduleFlushOrCompaction() will only schedule manual
// compactions (if manual_compaction_ is not null). This mechanism enables
// manual compactions to wait until all other compactions are finished.
int bg_manual_only_;
// number of background memtable flush jobs, submitted to the HIGH pool // number of background memtable flush jobs, submitted to the HIGH pool
int bg_flush_scheduled_; int bg_flush_scheduled_;
@ -780,15 +777,25 @@ class DBImpl : public DB {
int input_level; int input_level;
int output_level; int output_level;
uint32_t output_path_id; uint32_t output_path_id;
bool done;
Status status; Status status;
bool done;
bool in_progress; // compaction request being processed? bool in_progress; // compaction request being processed?
bool incomplete; // only part of requested range compacted
bool exclusive; // current behavior of only one manual
bool disallow_trivial_move; // Force actual compaction to run
const InternalKey* begin; // nullptr means beginning of key range const InternalKey* begin; // nullptr means beginning of key range
const InternalKey* end; // nullptr means end of key range const InternalKey* end; // nullptr means end of key range
InternalKey* manual_end; // how far we are compacting
InternalKey tmp_storage; // Used to keep track of compaction progress InternalKey tmp_storage; // Used to keep track of compaction progress
bool disallow_trivial_move; // Force actual compaction to run InternalKey tmp_storage1; // Used to keep track of compaction progress
Compaction* compaction;
};
std::deque<ManualCompaction*> manual_compaction_dequeue_;
struct CompactionArg {
DBImpl* db;
ManualCompaction* m;
}; };
ManualCompaction* manual_compaction_;
// Have we encountered a background error in paranoid mode? // Have we encountered a background error in paranoid mode?
Status bg_error_; Status bg_error_;
@ -885,6 +892,14 @@ class DBImpl : public DB {
DBPropertyType property_type, DBPropertyType property_type,
bool need_out_of_mutex, bool is_locked, bool need_out_of_mutex, bool is_locked,
uint64_t* value); uint64_t* value);
bool HasPendingManualCompaction();
bool HasExclusiveManualCompaction();
void AddManualCompaction(ManualCompaction* m);
void RemoveManualCompaction(ManualCompaction* m);
bool ShouldRunManualCompaction(ManualCompaction* m);
bool HaveManualCompaction(ColumnFamilyData* cfd);
bool MCOverlap(ManualCompaction* m, ManualCompaction* m1);
}; };
// Sanitize db options. The caller should delete result.info_log if // Sanitize db options. The caller should delete result.info_log if

@ -70,7 +70,7 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin,
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) cfd->ioptions()->compaction_style == kCompactionStyleFIFO)
? level ? level
: level + 1; : level + 1;
return RunManualCompaction(cfd, level, output_level, 0, begin, end, return RunManualCompaction(cfd, level, output_level, 0, begin, end, true,
disallow_trivial_move); disallow_trivial_move);
} }

@ -102,16 +102,21 @@ class DBTest : public DBTestBase {
DBTest() : DBTestBase("/db_test") {} DBTest() : DBTestBase("/db_test") {}
}; };
class DBTestWithParam : public DBTest, class DBTestWithParam
public testing::WithParamInterface<uint32_t> { : public DBTest,
public testing::WithParamInterface<std::tuple<uint32_t, bool>> {
public: public:
DBTestWithParam() { max_subcompactions_ = GetParam(); } DBTestWithParam() {
max_subcompactions_ = std::get<0>(GetParam());
exclusive_manual_compaction_ = std::get<1>(GetParam());
}
// Required if inheriting from testing::WithParamInterface<> // Required if inheriting from testing::WithParamInterface<>
static void SetUpTestCase() {} static void SetUpTestCase() {}
static void TearDownTestCase() {} static void TearDownTestCase() {}
uint32_t max_subcompactions_; uint32_t max_subcompactions_;
bool exclusive_manual_compaction_;
}; };
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -6389,7 +6394,9 @@ TEST_P(DBTestWithParam, FIFOCompactionTest) {
if (iter == 0) { if (iter == 0) {
ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(dbfull()->TEST_WaitForCompact());
} else { } else {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); CompactRangeOptions cro;
cro.exclusive_manual_compaction = exclusive_manual_compaction_;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
} }
// only 5 files should survive // only 5 files should survive
ASSERT_EQ(NumTableFilesAtLevel(0), 5); ASSERT_EQ(NumTableFilesAtLevel(0), 5);
@ -8397,7 +8404,9 @@ TEST_P(DBTestWithParam, FilterCompactionTimeTest) {
Flush(); Flush();
} }
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); CompactRangeOptions cro;
cro.exclusive_manual_compaction = exclusive_manual_compaction_;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_EQ(0U, CountLiveFiles()); ASSERT_EQ(0U, CountLiveFiles());
Reopen(options); Reopen(options);
@ -8802,6 +8811,36 @@ TEST_F(DBTest, HugeNumberOfLevels) {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
} }
TEST_F(DBTest, AutomaticConflictsWithManualCompaction) {
Options options = CurrentOptions();
options.write_buffer_size = 2 * 1024 * 1024; // 2MB
options.max_bytes_for_level_base = 2 * 1024 * 1024; // 2MB
options.num_levels = 12;
options.max_background_compactions = 10;
options.max_bytes_for_level_multiplier = 2;
options.level_compaction_dynamic_level_bytes = true;
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 300000; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
}
std::atomic<int> callback_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction()::Conflict",
[&](void* arg) { callback_count.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
CompactRangeOptions croptions;
croptions.exclusive_manual_compaction = false;
ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr));
ASSERT_GE(callback_count.load(), 1);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
for (int i = 0; i < 300000; ++i) {
ASSERT_NE("NOT_FOUND", Get(Key(i)));
}
}
// Github issue #595 // Github issue #595
// Large write batch with column families // Large write batch with column families
TEST_F(DBTest, LargeBatchWithColumnFamilies) { TEST_F(DBTest, LargeBatchWithColumnFamilies) {
@ -10105,7 +10144,8 @@ TEST_F(DBTest, SSTsWithLdbSuffixHandling) {
} }
INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam, INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam,
::testing::Values(1, 4)); ::testing::Combine(::testing::Values(1, 4),
::testing::Bool()));
TEST_F(DBTest, PauseBackgroundWorkTest) { TEST_F(DBTest, PauseBackgroundWorkTest) {
Options options; Options options;

@ -22,12 +22,16 @@ static std::string CompressibleString(Random* rnd, int len) {
class DBTestUniversalCompactionBase class DBTestUniversalCompactionBase
: public DBTestBase, : public DBTestBase,
public ::testing::WithParamInterface<int> { public ::testing::WithParamInterface<std::tuple<int, bool>> {
public: public:
explicit DBTestUniversalCompactionBase( explicit DBTestUniversalCompactionBase(
const std::string& path) : DBTestBase(path) {} const std::string& path) : DBTestBase(path) {}
virtual void SetUp() override { num_levels_ = GetParam(); } virtual void SetUp() override {
num_levels_ = std::get<0>(GetParam());
exclusive_manual_compaction_ = std::get<1>(GetParam());
}
int num_levels_; int num_levels_;
bool exclusive_manual_compaction_;
}; };
class DBTestUniversalCompaction : public DBTestUniversalCompactionBase { class DBTestUniversalCompaction : public DBTestUniversalCompactionBase {
@ -406,6 +410,7 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionTargetLevel) {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.change_level = true; compact_options.change_level = true;
compact_options.target_level = 4; compact_options.target_level = 4;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
db_->CompactRange(compact_options, nullptr, nullptr); db_->CompactRange(compact_options, nullptr, nullptr);
ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0)); ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
} }
@ -498,7 +503,8 @@ TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) {
INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionMultiLevels, INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionMultiLevels,
DBTestUniversalCompactionMultiLevels, DBTestUniversalCompactionMultiLevels,
::testing::Values(3, 20)); ::testing::Combine(::testing::Values(3, 20),
::testing::Bool()));
class DBTestUniversalCompactionParallel : class DBTestUniversalCompactionParallel :
public DBTestUniversalCompactionBase { public DBTestUniversalCompactionBase {
@ -571,7 +577,8 @@ TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) {
INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionParallel, INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionParallel,
DBTestUniversalCompactionParallel, DBTestUniversalCompactionParallel,
::testing::Values(1, 10)); ::testing::Combine(::testing::Values(1, 10),
::testing::Bool()));
TEST_P(DBTestUniversalCompaction, UniversalCompactionOptions) { TEST_P(DBTestUniversalCompaction, UniversalCompactionOptions) {
Options options; Options options;
@ -1063,6 +1070,7 @@ TEST_P(DBTestUniversalCompaction, IncreaseUniversalCompactionNumLevels) {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.change_level = true; compact_options.change_level = true;
compact_options.target_level = 0; compact_options.target_level = 0;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr); dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr);
// Need to restart it once to remove higher level records in manifest. // Need to restart it once to remove higher level records in manifest.
ReopenWithColumnFamilies({"default", "pikachu"}, options); ReopenWithColumnFamilies({"default", "pikachu"}, options);
@ -1186,7 +1194,8 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) {
} }
INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction, INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction,
::testing::Values(1, 3, 5)); ::testing::Combine(::testing::Values(1, 3, 5),
::testing::Bool()));
class DBTestUniversalManualCompactionOutputPathId class DBTestUniversalManualCompactionOutputPathId
: public DBTestUniversalCompactionBase { : public DBTestUniversalCompactionBase {
@ -1218,6 +1227,7 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId,
// Full compaction to DB path 0 // Full compaction to DB path 0
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.target_path_id = 1; compact_options.target_path_id = 1;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(1, TotalLiveFiles(1));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
@ -1240,6 +1250,7 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId,
// Full compaction to DB path 0 // Full compaction to DB path 0
compact_options.target_path_id = 0; compact_options.target_path_id = 0;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(1, TotalLiveFiles(1));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
@ -1247,13 +1258,15 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId,
// Fail when compacting to an invalid path ID // Fail when compacting to an invalid path ID
compact_options.target_path_id = 2; compact_options.target_path_id = 2;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr) ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)
.IsInvalidArgument()); .IsInvalidArgument());
} }
INSTANTIATE_TEST_CASE_P(DBTestUniversalManualCompactionOutputPathId, INSTANTIATE_TEST_CASE_P(DBTestUniversalManualCompactionOutputPathId,
DBTestUniversalManualCompactionOutputPathId, DBTestUniversalManualCompactionOutputPathId,
::testing::Values(1, 8)); ::testing::Combine(::testing::Values(1, 8),
::testing::Bool()));
} // namespace rocksdb } // namespace rocksdb

@ -322,7 +322,8 @@ class HdfsEnv : public Env {
} }
virtual void Schedule(void (*function)(void* arg), void* arg, virtual void Schedule(void (*function)(void* arg), void* arg,
Priority pri = LOW, void* tag = nullptr) override {} Priority pri = LOW, void* tag = nullptr,
void (*unschedFunction)(void* arg) = 0) override {}
virtual int UnSchedule(void* tag, Priority pri) override { return 0; } virtual int UnSchedule(void* tag, Priority pri) override { return 0; }

@ -245,7 +245,8 @@ class Env {
// I.e., the caller may not assume that background work items are // I.e., the caller may not assume that background work items are
// serialized. // serialized.
virtual void Schedule(void (*function)(void* arg), void* arg, virtual void Schedule(void (*function)(void* arg), void* arg,
Priority pri = LOW, void* tag = nullptr) = 0; Priority pri = LOW, void* tag = nullptr,
void (*unschedFunction)(void* arg) = 0) = 0;
// Arrange to remove jobs for given arg from the queue_ if they are not // Arrange to remove jobs for given arg from the queue_ if they are not
// already scheduled. Caller is expected to have exclusive lock on arg. // already scheduled. Caller is expected to have exclusive lock on arg.
@ -822,8 +823,8 @@ class EnvWrapper : public Env {
Status UnlockFile(FileLock* l) override { return target_->UnlockFile(l); } Status UnlockFile(FileLock* l) override { return target_->UnlockFile(l); }
void Schedule(void (*f)(void* arg), void* a, Priority pri, void Schedule(void (*f)(void* arg), void* a, Priority pri,
void* tag = nullptr) override { void* tag = nullptr, void (*u)(void* arg) = 0) override {
return target_->Schedule(f, a, pri, tag); return target_->Schedule(f, a, pri, tag, u);
} }
int UnSchedule(void* tag, Priority pri) override { int UnSchedule(void* tag, Priority pri) override {

@ -1422,6 +1422,9 @@ enum class BottommostLevelCompaction {
// CompactRangeOptions is used by CompactRange() call. // CompactRangeOptions is used by CompactRange() call.
struct CompactRangeOptions { struct CompactRangeOptions {
// If true, no other compaction will run at the same time as this
// manual compaction
bool exclusive_manual_compaction = true;
// If true, compacted files will be moved to the minimum level capable // If true, compacted files will be moved to the minimum level capable
// of holding the data or given level (specified non-negative target_level). // of holding the data or given level (specified non-negative target_level).
bool change_level = false; bool change_level = false;

@ -1549,7 +1549,8 @@ class WinEnv : public Env {
} }
virtual void Schedule(void (*function)(void*), void* arg, Priority pri = LOW, virtual void Schedule(void (*function)(void*), void* arg, Priority pri = LOW,
void* tag = nullptr) override; void* tag = nullptr,
void (*unschedFunction)(void* arg) = 0) override;
virtual int UnSchedule(void* arg, Priority pri) override; virtual int UnSchedule(void* arg, Priority pri) override;
@ -1978,7 +1979,8 @@ class WinEnv : public Env {
} }
} }
void Schedule(void (*function)(void* arg1), void* arg, void* tag) { void Schedule(void (*function)(void* arg1), void* arg, void* tag,
void (*unschedFunction)(void* arg)) {
std::lock_guard<std::mutex> lg(mu_); std::lock_guard<std::mutex> lg(mu_);
if (exit_all_threads_) { if (exit_all_threads_) {
@ -1992,6 +1994,7 @@ class WinEnv : public Env {
queue_.back().function = function; queue_.back().function = function;
queue_.back().arg = arg; queue_.back().arg = arg;
queue_.back().tag = tag; queue_.back().tag = tag;
queue_.back().unschedFunction = unschedFunction;
queue_len_.store(queue_.size(), std::memory_order_relaxed); queue_len_.store(queue_.size(), std::memory_order_relaxed);
if (!HasExcessiveThread()) { if (!HasExcessiveThread()) {
@ -2013,6 +2016,11 @@ class WinEnv : public Env {
BGQueue::iterator it = queue_.begin(); BGQueue::iterator it = queue_.begin();
while (it != queue_.end()) { while (it != queue_.end()) {
if (arg == (*it).tag) { if (arg == (*it).tag) {
void (*unschedFunction)(void*) = (*it).unschedFunction;
void* arg1 = (*it).arg;
if (unschedFunction != nullptr) {
(*unschedFunction)(arg1);
}
it = queue_.erase(it); it = queue_.erase(it);
count++; count++;
} else { } else {
@ -2036,6 +2044,7 @@ class WinEnv : public Env {
void* arg; void* arg;
void (*function)(void*); void (*function)(void*);
void* tag; void* tag;
void (*unschedFunction)(void*);
}; };
typedef std::deque<BGItem> BGQueue; typedef std::deque<BGItem> BGQueue;
@ -2094,9 +2103,9 @@ WinEnv::WinEnv()
} }
void WinEnv::Schedule(void (*function)(void*), void* arg, Priority pri, void WinEnv::Schedule(void (*function)(void*), void* arg, Priority pri,
void* tag) { void* tag, void (*unschedFunction)(void* arg)) {
assert(pri >= Priority::LOW && pri <= Priority::HIGH); assert(pri >= Priority::LOW && pri <= Priority::HIGH);
thread_pools_[pri].Schedule(function, arg, tag); thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
} }
int WinEnv::UnSchedule(void* arg, Priority pri) { int WinEnv::UnSchedule(void* arg, Priority pri) {

@ -443,7 +443,8 @@ class PosixEnv : public Env {
} }
virtual void Schedule(void (*function)(void* arg1), void* arg, virtual void Schedule(void (*function)(void* arg1), void* arg,
Priority pri = LOW, void* tag = nullptr) override; Priority pri = LOW, void* tag = nullptr,
void (*unschedFunction)(void* arg) = 0) override;
virtual int UnSchedule(void* arg, Priority pri) override; virtual int UnSchedule(void* arg, Priority pri) override;
@ -689,9 +690,9 @@ PosixEnv::PosixEnv()
} }
void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri, void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
void* tag) { void* tag, void (*unschedFunction)(void* arg)) {
assert(pri >= Priority::LOW && pri <= Priority::HIGH); assert(pri >= Priority::LOW && pri <= Priority::HIGH);
thread_pools_[pri].Schedule(function, arg, tag); thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
} }
int PosixEnv::UnSchedule(void* arg, Priority pri) { int PosixEnv::UnSchedule(void* arg, Priority pri) {

@ -7,8 +7,8 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <atomic>
#include "util/thread_posix.h" #include "util/thread_posix.h"
#include <atomic>
#include <unistd.h> #include <unistd.h>
#ifdef OS_LINUX #ifdef OS_LINUX
#include <sys/syscall.h> #include <sys/syscall.h>
@ -197,7 +197,8 @@ void ThreadPool::StartBGThreads() {
} }
} }
void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag) { void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag,
void (*unschedFunction)(void* arg)) {
PthreadCall("lock", pthread_mutex_lock(&mu_)); PthreadCall("lock", pthread_mutex_lock(&mu_));
if (exit_all_threads_) { if (exit_all_threads_) {
@ -212,6 +213,7 @@ void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag) {
queue_.back().function = function; queue_.back().function = function;
queue_.back().arg = arg; queue_.back().arg = arg;
queue_.back().tag = tag; queue_.back().tag = tag;
queue_.back().unschedFunction = unschedFunction;
queue_len_.store(static_cast<unsigned int>(queue_.size()), queue_len_.store(static_cast<unsigned int>(queue_.size()),
std::memory_order_relaxed); std::memory_order_relaxed);
@ -235,6 +237,11 @@ int ThreadPool::UnSchedule(void* arg) {
BGQueue::iterator it = queue_.begin(); BGQueue::iterator it = queue_.begin();
while (it != queue_.end()) { while (it != queue_.end()) {
if (arg == (*it).tag) { if (arg == (*it).tag) {
void (*unschedFunction)(void*) = (*it).unschedFunction;
void* arg1 = (*it).arg;
if (unschedFunction != nullptr) {
(*unschedFunction)(arg1);
}
it = queue_.erase(it); it = queue_.erase(it);
count++; count++;
} else { } else {

@ -24,7 +24,8 @@ class ThreadPool {
void IncBackgroundThreadsIfNeeded(int num); void IncBackgroundThreadsIfNeeded(int num);
void SetBackgroundThreads(int num); void SetBackgroundThreads(int num);
void StartBGThreads(); void StartBGThreads();
void Schedule(void (*function)(void* arg1), void* arg, void* tag); void Schedule(void (*function)(void* arg1), void* arg, void* tag,
void (*unschedFunction)(void* arg));
int UnSchedule(void* arg); int UnSchedule(void* arg);
unsigned int GetQueueLen() const { unsigned int GetQueueLen() const {
@ -66,6 +67,7 @@ class ThreadPool {
void* arg; void* arg;
void (*function)(void*); void (*function)(void*);
void* tag; void* tag;
void (*unschedFunction)(void*);
}; };
typedef std::deque<BGItem> BGQueue; typedef std::deque<BGItem> BGQueue;

@ -3,10 +3,10 @@
// LICENSE file in the root directory of this source tree. An additional grant // LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory. // of patent rights can be found in the PATENTS file in the same directory.
#include "rocksdb/utilities/flashcache.h"
#include "utilities/flashcache/flashcache.h" #include "utilities/flashcache/flashcache.h"
#include "rocksdb/utilities/flashcache.h"
#ifdef OS_LINUX #ifdef OS_LINUX
#include <fcntl.h> #include <fcntl.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
@ -91,7 +91,7 @@ class FlashcacheAwareEnv : public EnvWrapper {
} }
void Schedule(void (*f)(void* arg), void* a, Priority pri, void Schedule(void (*f)(void* arg), void* a, Priority pri,
void* tag = nullptr) override { void* tag = nullptr, void (*u)(void* arg) = 0) override {
EnvWrapper::Schedule(&BgThreadWrapper, new Arg(f, a, cachedev_fd_), pri, EnvWrapper::Schedule(&BgThreadWrapper, new Arg(f, a, cachedev_fd_), pri,
tag); tag);
} }

Loading…
Cancel
Save