diff --git a/db/column_family.cc b/db/column_family.cc index a6fae98f3..b2d0a1b0e 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -546,13 +546,12 @@ const int ColumnFamilyData::kCompactAllLevels = -1; const int ColumnFamilyData::kCompactToBaseLevel = -2; Compaction* ColumnFamilyData::CompactRange( - const MutableCFOptions& mutable_cf_options, - int input_level, int output_level, uint32_t output_path_id, - const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end) { + const MutableCFOptions& mutable_cf_options, int input_level, + int output_level, uint32_t output_path_id, const InternalKey* begin, + const InternalKey* end, InternalKey** compaction_end, bool* conflict) { auto* result = compaction_picker_->CompactRange( GetName(), mutable_cf_options, current_->storage_info(), input_level, - output_level, output_path_id, begin, end, compaction_end); + output_level, output_path_id, begin, end, compaction_end, conflict); if (result != nullptr) { result->SetInputVersion(current_); } diff --git a/db/column_family.h b/db/column_family.h index e44873c7a..40a6d6910 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -249,11 +249,11 @@ class ColumnFamilyData { // A flag to tell a manual compaction's output is base level. static const int kCompactToBaseLevel; // REQUIRES: DB mutex held - Compaction* CompactRange( - const MutableCFOptions& mutable_cf_options, - int input_level, int output_level, uint32_t output_path_id, - const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end); + Compaction* CompactRange(const MutableCFOptions& mutable_cf_options, + int input_level, int output_level, + uint32_t output_path_id, const InternalKey* begin, + const InternalKey* end, InternalKey** compaction_end, + bool* manual_conflict); CompactionPicker* compaction_picker() { return compaction_picker_.get(); } // thread-safe diff --git a/db/column_family_test.cc b/db/column_family_test.cc index b877ea8a1..f36366a85 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -184,11 +184,17 @@ class ColumnFamilyTest : public testing::Test { } } - void PutRandomData(int cf, int num, int key_value_size) { + void PutRandomData(int cf, int num, int key_value_size, bool save = false) { for (int i = 0; i < num; ++i) { // 10 bytes for key, rest is value - ASSERT_OK(Put(cf, test::RandomKey(&rnd_, 10), - RandomString(&rnd_, key_value_size - 10))); + if (!save) { + ASSERT_OK(Put(cf, test::RandomKey(&rnd_, 11), + RandomString(&rnd_, key_value_size - 10))); + } else { + std::string key = test::RandomKey(&rnd_, 11); + keys_.insert(key); + ASSERT_OK(Put(cf, key, RandomString(&rnd_, key_value_size - 10))); + } } } @@ -378,6 +384,7 @@ class ColumnFamilyTest : public testing::Test { std::vector handles_; std::vector names_; + std::set keys_; ColumnFamilyOptions column_family_options_; DBOptions db_options_; std::string dbname_; @@ -921,6 +928,682 @@ TEST_F(ColumnFamilyTest, DifferentCompactionStyles) { Close(); } +#ifndef ROCKSDB_LITE +// Sync points not supported in RocksDB Lite + +TEST_F(ColumnFamilyTest, MultipleManualCompactions) { + Open(); + CreateColumnFamilies({"one", "two"}); + ColumnFamilyOptions default_cf, one, two; + db_options_.max_open_files = 20; // only 10 files in file cache + db_options_.disableDataSync = true; + db_options_.max_background_compactions = 3; + + default_cf.compaction_style = kCompactionStyleLevel; + default_cf.num_levels = 3; + default_cf.write_buffer_size = 64 << 10; // 64KB + default_cf.target_file_size_base = 30 << 10; + default_cf.source_compaction_factor = 100; + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + one.compaction_style = kCompactionStyleUniversal; + + one.num_levels = 1; + // trigger compaction if there are >= 4 files + one.level0_file_num_compaction_trigger = 4; + one.write_buffer_size = 120000; + + two.compaction_style = kCompactionStyleLevel; + two.num_levels = 4; + two.level0_file_num_compaction_trigger = 3; + two.write_buffer_size = 100000; + + Reopen({default_cf, one, two}); + + // SETUP column family "one" -- universal style + for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(i + 1), 1); + } + bool cf_1_1 = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"ColumnFamilyTest::MultiManual:4", "ColumnFamilyTest::MultiManual:1"}, + {"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:5"}, + {"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (cf_1_1) { + TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:4"); + cf_1_1 = false; + TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:3"); + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + std::vector threads; + threads.emplace_back([&] { + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK( + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + }); + + // SETUP column family "two" -- level style with 4 levels + for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(2, 10, 12000); + PutRandomData(2, 1, 10); + WaitForFlush(2); + AssertFilesPerLevel(ToString(i + 1), 2); + } + threads.emplace_back([&] { + TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:1"); + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK( + db_->CompactRange(compact_options, handles_[2], nullptr, nullptr)); + TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:2"); + }); + + TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:5"); + for (auto& t : threads) { + t.join(); + } + + // VERIFY compaction "one" + AssertFilesPerLevel("1", 1); + + // VERIFY compaction "two" + AssertFilesPerLevel("0,1", 2); + CompactAll(2); + AssertFilesPerLevel("0,1", 2); + // Compare against saved keys + std::set::iterator key_iter = keys_.begin(); + while (key_iter != keys_.end()) { + ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); + key_iter++; + } + Close(); +} + +TEST_F(ColumnFamilyTest, AutomaticAndManualCompactions) { + Open(); + CreateColumnFamilies({"one", "two"}); + ColumnFamilyOptions default_cf, one, two; + db_options_.max_open_files = 20; // only 10 files in file cache + db_options_.disableDataSync = true; + db_options_.max_background_compactions = 3; + + default_cf.compaction_style = kCompactionStyleLevel; + default_cf.num_levels = 3; + default_cf.write_buffer_size = 64 << 10; // 64KB + default_cf.target_file_size_base = 30 << 10; + default_cf.source_compaction_factor = 100; + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + one.compaction_style = kCompactionStyleUniversal; + + one.num_levels = 1; + // trigger compaction if there are >= 4 files + one.level0_file_num_compaction_trigger = 4; + one.write_buffer_size = 120000; + + two.compaction_style = kCompactionStyleLevel; + two.num_levels = 4; + two.level0_file_num_compaction_trigger = 3; + two.write_buffer_size = 100000; + + Reopen({default_cf, one, two}); + + bool cf_1_1 = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:1"}, + {"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:5"}, + {"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (cf_1_1) { + cf_1_1 = false; + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4"); + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3"); + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + // SETUP column family "one" -- universal style + for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(i + 1), 1); + } + + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1"); + + // SETUP column family "two" -- level style with 4 levels + for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(2, 10, 12000); + PutRandomData(2, 1, 10); + WaitForFlush(2); + AssertFilesPerLevel(ToString(i + 1), 2); + } + std::thread threads([&] { + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK( + db_->CompactRange(compact_options, handles_[2], nullptr, nullptr)); + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2"); + }); + + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5"); + threads.join(); + + // WAIT for compactions + WaitForCompaction(); + + // VERIFY compaction "one" + AssertFilesPerLevel("1", 1); + + // VERIFY compaction "two" + AssertFilesPerLevel("0,1", 2); + CompactAll(2); + AssertFilesPerLevel("0,1", 2); + // Compare against saved keys + std::set::iterator key_iter = keys_.begin(); + while (key_iter != keys_.end()) { + ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); + key_iter++; + } + Close(); +} + +TEST_F(ColumnFamilyTest, ManualAndAutomaticCompactions) { + Open(); + CreateColumnFamilies({"one", "two"}); + ColumnFamilyOptions default_cf, one, two; + db_options_.max_open_files = 20; // only 10 files in file cache + db_options_.disableDataSync = true; + db_options_.max_background_compactions = 3; + + default_cf.compaction_style = kCompactionStyleLevel; + default_cf.num_levels = 3; + default_cf.write_buffer_size = 64 << 10; // 64KB + default_cf.target_file_size_base = 30 << 10; + default_cf.source_compaction_factor = 100; + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + one.compaction_style = kCompactionStyleUniversal; + + one.num_levels = 1; + // trigger compaction if there are >= 4 files + one.level0_file_num_compaction_trigger = 4; + one.write_buffer_size = 120000; + + two.compaction_style = kCompactionStyleLevel; + two.num_levels = 4; + two.level0_file_num_compaction_trigger = 3; + two.write_buffer_size = 100000; + + Reopen({default_cf, one, two}); + + // SETUP column family "one" -- universal style + for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(i + 1), 1); + } + bool cf_1_1 = true; + bool cf_1_2 = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:1"}, + {"ColumnFamilyTest::ManualAuto:5", "ColumnFamilyTest::ManualAuto:2"}, + {"ColumnFamilyTest::ManualAuto:2", "ColumnFamilyTest::ManualAuto:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (cf_1_1) { + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4"); + cf_1_1 = false; + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3"); + } else if (cf_1_2) { + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2"); + cf_1_2 = false; + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + std::thread threads([&] { + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK( + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + }); + + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1"); + + // SETUP column family "two" -- level style with 4 levels + for (int i = 0; i < two.level0_file_num_compaction_trigger; ++i) { + PutRandomData(2, 10, 12000); + PutRandomData(2, 1, 10); + WaitForFlush(2); + AssertFilesPerLevel(ToString(i + 1), 2); + } + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5"); + threads.join(); + + // WAIT for compactions + WaitForCompaction(); + + // VERIFY compaction "one" + AssertFilesPerLevel("1", 1); + + // VERIFY compaction "two" + AssertFilesPerLevel("0,1", 2); + CompactAll(2); + AssertFilesPerLevel("0,1", 2); + // Compare against saved keys + std::set::iterator key_iter = keys_.begin(); + while (key_iter != keys_.end()) { + ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); + key_iter++; + } + Close(); +} + +TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactions) { + Open(); + CreateColumnFamilies({"one"}); + ColumnFamilyOptions default_cf, one; + db_options_.max_open_files = 20; // only 10 files in file cache + db_options_.disableDataSync = true; + db_options_.max_background_compactions = 3; + + default_cf.compaction_style = kCompactionStyleLevel; + default_cf.num_levels = 3; + default_cf.write_buffer_size = 64 << 10; // 64KB + default_cf.target_file_size_base = 30 << 10; + default_cf.source_compaction_factor = 100; + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + one.compaction_style = kCompactionStyleUniversal; + + one.num_levels = 1; + // trigger compaction if there are >= 4 files + one.level0_file_num_compaction_trigger = 4; + one.write_buffer_size = 120000; + + Reopen({default_cf, one}); + + // SETUP column family "one" -- universal style + for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(i + 1), 1); + } + bool cf_1_1 = true; + bool cf_1_2 = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"}, + {"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"}, + {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:2"}, + {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (cf_1_1) { + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4"); + cf_1_1 = false; + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3"); + } else if (cf_1_2) { + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2"); + cf_1_2 = false; + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + std::thread threads([&] { + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK( + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + }); + + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5"); + + WaitForFlush(1); + + // Add more L0 files and force automatic compaction + for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i), + 1); + } + + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1"); + + threads.join(); + WaitForCompaction(); + // VERIFY compaction "one" + ASSERT_LE(NumTableFilesAtLevel(0, 1), 2); + + // Compare against saved keys + std::set::iterator key_iter = keys_.begin(); + while (key_iter != keys_.end()) { + ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); + key_iter++; + } + Close(); +} + +TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactionsLevel) { + Open(); + CreateColumnFamilies({"one"}); + ColumnFamilyOptions default_cf, one; + db_options_.max_open_files = 20; // only 10 files in file cache + db_options_.disableDataSync = true; + db_options_.max_background_compactions = 3; + + default_cf.compaction_style = kCompactionStyleLevel; + default_cf.num_levels = 3; + default_cf.write_buffer_size = 64 << 10; // 64KB + default_cf.target_file_size_base = 30 << 10; + default_cf.source_compaction_factor = 100; + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + one.compaction_style = kCompactionStyleLevel; + + one.num_levels = 1; + // trigger compaction if there are >= 4 files + one.level0_file_num_compaction_trigger = 4; + one.write_buffer_size = 120000; + + Reopen({default_cf, one}); + + // SETUP column family "one" -- level style + for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(i + 1), 1); + } + bool cf_1_1 = true; + bool cf_1_2 = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"}, + {"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"}, + {"ColumnFamilyTest::ManualAuto:3", "ColumnFamilyTest::ManualAuto:2"}, + {"LevelCompactionPicker::PickCompactionBySize:0", + "ColumnFamilyTest::ManualAuto:3"}, + {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (cf_1_1) { + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4"); + cf_1_1 = false; + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3"); + } else if (cf_1_2) { + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2"); + cf_1_2 = false; + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + std::thread threads([&] { + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK( + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + }); + + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5"); + + // Add more L0 files and force automatic compaction + for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i), + 1); + } + + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1"); + + threads.join(); + WaitForCompaction(); + // VERIFY compaction "one" + AssertFilesPerLevel("0,1", 1); + + // Compare against saved keys + std::set::iterator key_iter = keys_.begin(); + while (key_iter != keys_.end()) { + ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); + key_iter++; + } + Close(); +} + +// This test checks for automatic getting a conflict if there is a +// manual which has not yet been scheduled. +// The manual compaction waits in NotScheduled +// We generate more files and then trigger an automatic compaction +// This will wait because there is an unscheduled manual compaction. +// Once the conflict is hit, the manual compaction starts and ends +// Then another automatic will start and end. +TEST_F(ColumnFamilyTest, SameCFManualAutomaticConflict) { + Open(); + CreateColumnFamilies({"one"}); + ColumnFamilyOptions default_cf, one; + db_options_.max_open_files = 20; // only 10 files in file cache + db_options_.disableDataSync = true; + db_options_.max_background_compactions = 3; + + default_cf.compaction_style = kCompactionStyleLevel; + default_cf.num_levels = 3; + default_cf.write_buffer_size = 64 << 10; // 64KB + default_cf.target_file_size_base = 30 << 10; + default_cf.source_compaction_factor = 100; + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + one.compaction_style = kCompactionStyleUniversal; + + one.num_levels = 1; + // trigger compaction if there are >= 4 files + one.level0_file_num_compaction_trigger = 4; + one.write_buffer_size = 120000; + + Reopen({default_cf, one}); + + // SETUP column family "one" -- universal style + for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(i + 1), 1); + } + bool cf_1_1 = true; + bool cf_1_2 = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BackgroundCompaction()::Conflict", + "ColumnFamilyTest::ManualAutoCon:7"}, + {"ColumnFamilyTest::ManualAutoCon:9", + "ColumnFamilyTest::ManualAutoCon:8"}, + {"ColumnFamilyTest::ManualAutoCon:2", + "ColumnFamilyTest::ManualAutoCon:6"}, + {"ColumnFamilyTest::ManualAutoCon:4", + "ColumnFamilyTest::ManualAutoCon:5"}, + {"ColumnFamilyTest::ManualAutoCon:1", + "ColumnFamilyTest::ManualAutoCon:2"}, + {"ColumnFamilyTest::ManualAutoCon:1", + "ColumnFamilyTest::ManualAutoCon:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (cf_1_1) { + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:4"); + cf_1_1 = false; + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:3"); + } else if (cf_1_2) { + cf_1_2 = false; + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:2"); + } + }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::RunManualCompaction:NotScheduled", [&](void* arg) { + InstrumentedMutex* mutex = static_cast(arg); + mutex->Unlock(); + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:9"); + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:7"); + mutex->Lock(); + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + std::thread threads([&] { + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK( + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:6"); + }); + + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:8"); + WaitForFlush(1); + + // Add more L0 files and force automatic compaction + for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i), + 1); + } + + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:5"); + // Add more L0 files and force automatic compaction + for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + } + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:1"); + + threads.join(); + WaitForCompaction(); + // VERIFY compaction "one" + ASSERT_LE(NumTableFilesAtLevel(0, 1), 3); + + // Compare against saved keys + std::set::iterator key_iter = keys_.begin(); + while (key_iter != keys_.end()) { + ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); + key_iter++; + } + + Close(); +} + +// In this test, we generate enough files to trigger automatic compactions. +// The automatic compaction waits in NonTrivial:AfterRun +// We generate more files and then trigger an automatic compaction +// This will wait because the automatic compaction has files it needs. +// Once the conflict is hit, the automatic compaction starts and ends +// Then the manual will run and end. +TEST_F(ColumnFamilyTest, SameCFAutomaticManualCompactions) { + Open(); + CreateColumnFamilies({"one"}); + ColumnFamilyOptions default_cf, one; + db_options_.max_open_files = 20; // only 10 files in file cache + db_options_.disableDataSync = true; + db_options_.max_background_compactions = 3; + + default_cf.compaction_style = kCompactionStyleLevel; + default_cf.num_levels = 3; + default_cf.write_buffer_size = 64 << 10; // 64KB + default_cf.target_file_size_base = 30 << 10; + default_cf.source_compaction_factor = 100; + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + one.compaction_style = kCompactionStyleUniversal; + + one.num_levels = 1; + // trigger compaction if there are >= 4 files + one.level0_file_num_compaction_trigger = 4; + one.write_buffer_size = 120000; + + Reopen({default_cf, one}); + + bool cf_1_1 = true; + bool cf_1_2 = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:2"}, + {"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:5"}, + {"CompactionPicker::CompactRange:Conflict", + "ColumnFamilyTest::AutoManual:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (cf_1_1) { + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4"); + cf_1_1 = false; + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3"); + } else if (cf_1_2) { + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2"); + cf_1_2 = false; + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // SETUP column family "one" -- universal style + for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(i + 1), 1); + } + + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5"); + + // Add another L0 file and force automatic compaction + for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + } + + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1"); + + WaitForCompaction(); + // VERIFY compaction "one" + AssertFilesPerLevel("1", 1); + // Compare against saved keys + std::set::iterator key_iter = keys_.begin(); + while (key_iter != keys_.end()) { + ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); + key_iter++; + } + + Close(); +} +#endif // !ROCKSDB_LITE + #ifndef ROCKSDB_LITE // Tailing interator not supported namespace { std::string IterStatus(Iterator* iter) { diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index d27bd3cb5..00fb9096e 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -455,7 +455,7 @@ Compaction* CompactionPicker::CompactRange( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, int input_level, int output_level, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end) { + InternalKey** compaction_end, bool* manual_conflict) { // CompactionPickerFIFO has its own implementation of compact range assert(ioptions_.compaction_style != kCompactionStyleFIFO); @@ -481,6 +481,12 @@ Compaction* CompactionPicker::CompactRange( return nullptr; } + if ((start_level == 0) && (!level0_compactions_in_progress_.empty())) { + *manual_conflict = true; + // Only one level 0 compaction allowed + return nullptr; + } + std::vector inputs(vstorage->num_levels() - start_level); for (int level = start_level; level < vstorage->num_levels(); level++) { @@ -489,13 +495,21 @@ Compaction* CompactionPicker::CompactRange( for (FileMetaData* f : vstorage->LevelFiles(level)) { files.push_back(f); } + if (FilesInCompaction(files)) { + *manual_conflict = true; + return nullptr; + } } - return new Compaction( + Compaction* c = new Compaction( vstorage, mutable_cf_options, std::move(inputs), output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), /* max_grandparent_overlap_bytes */ LLONG_MAX, output_path_id, GetCompressionType(ioptions_, output_level, 1), /* grandparents */ {}, /* is manual */ true); + if (start_level == 0) { + level0_compactions_in_progress_.insert(c); + } + return c; } CompactionInputFiles inputs; @@ -514,6 +528,13 @@ Compaction* CompactionPicker::CompactRange( return nullptr; } + if ((input_level == 0) && (!level0_compactions_in_progress_.empty())) { + // Only one level 0 compaction allowed + TEST_SYNC_POINT("CompactionPicker::CompactRange:Conflict"); + *manual_conflict = true; + return nullptr; + } + // Avoid compacting too much in one shot in case the range is large. // But we cannot do this for level-0 since level-0 files can overlap // and we must not pick one file and drop another older file if the @@ -536,9 +557,10 @@ Compaction* CompactionPicker::CompactRange( assert(output_path_id < static_cast(ioptions_.db_paths.size())); if (ExpandWhileOverlapping(cf_name, vstorage, &inputs) == false) { - // manual compaction is currently single-threaded, so it should never + // manual compaction is now multi-threaded, so it can // happen that ExpandWhileOverlapping fails - assert(false); + // we handle it higher in RunManualCompaction + *manual_conflict = true; return nullptr; } @@ -557,9 +579,10 @@ Compaction* CompactionPicker::CompactRange( int parent_index = -1; if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs, &output_level_inputs, &parent_index, -1)) { - // manual compaction is currently single-threaded, so it should never + // manual compaction is now multi-threaded, so it can // happen that SetupOtherInputs fails - assert(false); + // we handle it higher in RunManualCompaction + *manual_conflict = true; return nullptr; } } @@ -568,6 +591,12 @@ Compaction* CompactionPicker::CompactRange( if (!output_level_inputs.empty()) { compaction_inputs.push_back(output_level_inputs); } + for (size_t i = 0; i < compaction_inputs.size(); i++) { + if (FilesInCompaction(compaction_inputs[i].files)) { + *manual_conflict = true; + return nullptr; + } + } std::vector grandparents; GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents); @@ -580,6 +609,9 @@ Compaction* CompactionPicker::CompactRange( std::move(grandparents), /* is manual compaction */ true); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); + if (input_level == 0) { + level0_compactions_in_progress_.insert(compaction); + } return compaction; } @@ -1033,6 +1065,7 @@ bool LevelCompactionPicker::PickCompactionBySize(VersionStorageInfo* vstorage, // could be made better by looking at key-ranges that are // being compacted at level 0. if (level == 0 && !level0_compactions_in_progress_.empty()) { + TEST_SYNC_POINT("LevelCompactionPicker::PickCompactionBySize:0"); return false; } @@ -1751,7 +1784,7 @@ Compaction* FIFOCompactionPicker::CompactRange( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, int input_level, int output_level, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end) { + InternalKey** compaction_end, bool* manual_conflict) { assert(input_level == 0); assert(output_level == 0); *compaction_end = nullptr; diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 062fa06da..8798235a1 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -60,7 +60,7 @@ class CompactionPicker { const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, int input_level, int output_level, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end); + InternalKey** compaction_end, bool* manual_conflict); // The maximum allowed output level. Default value is NumberLevels() - 1. virtual int MaxOutputLevel() const { @@ -302,7 +302,7 @@ class FIFOCompactionPicker : public CompactionPicker { const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, int input_level, int output_level, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end) override; + InternalKey** compaction_end, bool* manual_conflict) override; // The maximum allowed output level. Always returns 0. virtual int MaxOutputLevel() const override { @@ -329,11 +329,13 @@ class NullCompactionPicker : public CompactionPicker { } // Always return "nullptr" - Compaction* CompactRange( - const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, int input_level, int output_level, - uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end) override { + Compaction* CompactRange(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, int input_level, + int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end, + bool* manual_conflict) override { return nullptr; } diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 72b2b707d..80f2b3900 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -21,11 +21,13 @@ class DBCompactionTest : public DBTestBase { DBCompactionTest() : DBTestBase("/db_compaction_test") {} }; -class DBCompactionTestWithParam : public DBTestBase, - public testing::WithParamInterface { +class DBCompactionTestWithParam + : public DBTestBase, + public testing::WithParamInterface> { public: DBCompactionTestWithParam() : DBTestBase("/db_compaction_test") { - max_subcompactions_ = GetParam(); + max_subcompactions_ = std::get<0>(GetParam()); + exclusive_manual_compaction_ = std::get<1>(GetParam()); } // Required if inheriting from testing::WithParamInterface<> @@ -33,6 +35,7 @@ class DBCompactionTestWithParam : public DBTestBase, static void TearDownTestCase() {} uint32_t max_subcompactions_; + bool exclusive_manual_compaction_; }; namespace { @@ -617,8 +620,11 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) { ASSERT_EQ(metadata.size(), 1U); LiveFileMetaData level0_file = metadata[0]; // L0 file meta + CompactRangeOptions cro; + cro.exclusive_manual_compaction = exclusive_manual_compaction_; + // Compaction will initiate a trivial move from L0 to L1 - dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); + dbfull()->CompactRange(cro, nullptr, nullptr); // File moved From L0 to L1 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0 @@ -682,9 +688,12 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) { ASSERT_EQ(level0_files, ranges.size()); // Multiple files in L0 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1 + CompactRangeOptions cro; + cro.exclusive_manual_compaction = exclusive_manual_compaction_; + // Since data is non-overlapping we expect compaction to initiate // a trivial move - db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + db_->CompactRange(cro, nullptr, nullptr); // We expect that all the files were trivially moved from L0 to L1 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files); @@ -721,7 +730,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) { ASSERT_OK(Flush()); } - db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + db_->CompactRange(cro, nullptr, nullptr); for (uint32_t i = 0; i < ranges.size(); i++) { for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { @@ -777,6 +786,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) { CompactRangeOptions compact_options; compact_options.change_level = true; compact_options.target_level = 6; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); // 2 files in L6 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0)); @@ -792,6 +802,263 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) { } } +TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) { + int32_t trivial_move = 0; + int32_t non_trivial_move = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:TrivialMove", + [&](void* arg) { trivial_move++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial", + [&](void* arg) { non_trivial_move++; }); + bool first = true; + bool second = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBCompaction::ManualPartial:4", "DBCompaction::ManualPartial:1"}, + {"DBCompaction::ManualPartial:2", "DBCompaction::ManualPartial:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (first) { + TEST_SYNC_POINT("DBCompaction::ManualPartial:4"); + first = false; + TEST_SYNC_POINT("DBCompaction::ManualPartial:3"); + } else if (second) { + TEST_SYNC_POINT("DBCompaction::ManualPartial:2"); + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Options options = CurrentOptions(); + options.write_buffer_size = 10 * 1024 * 1024; + options.num_levels = 7; + options.max_subcompactions = max_subcompactions_; + options.level0_file_num_compaction_trigger = 3; + options.max_background_compactions = 3; + options.target_file_size_base = 1 << 23; // 8 MB + + DestroyAndReopen(options); + int32_t value_size = 10 * 1024; // 10 KB + + // Add 2 non-overlapping files + Random rnd(301); + std::map values; + + // file 1 [0 => 100] + for (int32_t i = 0; i < 100; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // file 2 [100 => 300] + for (int32_t i = 100; i < 300; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // 2 files in L0 + ASSERT_EQ("2", FilesPerLevel(0)); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 6; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; + // Trivial move the two non-overlapping files to level 6 + ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + // 2 files in L6 + ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0)); + + ASSERT_EQ(trivial_move, 1); + ASSERT_EQ(non_trivial_move, 0); + + // file 3 [ 0 => 200] + for (int32_t i = 0; i < 200; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // 1 files in L0 + ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel(0)); + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false)); + ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr, false)); + ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr, nullptr, false)); + ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr, false)); + ASSERT_OK(dbfull()->TEST_CompactRange(4, nullptr, nullptr, nullptr, false)); + // 2 files in L6, 1 file in L5 + ASSERT_EQ("0,0,0,0,0,1,2", FilesPerLevel(0)); + + ASSERT_EQ(trivial_move, 6); + ASSERT_EQ(non_trivial_move, 0); + + std::thread threads([&] { + compact_options.change_level = false; + compact_options.exclusive_manual_compaction = false; + std::string begin_string = Key(0); + std::string end_string = Key(199); + Slice begin(begin_string); + Slice end(end_string); + ASSERT_OK(db_->CompactRange(compact_options, &begin, &end)); + }); + + TEST_SYNC_POINT("DBCompaction::ManualPartial:1"); + // file 4 [300 => 400) + for (int32_t i = 300; i <= 400; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // file 5 [400 => 500) + for (int32_t i = 400; i <= 500; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // file 6 [500 => 600) + for (int32_t i = 500; i <= 600; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // 3 files in L0 + ASSERT_EQ("3,0,0,0,0,1,2", FilesPerLevel(0)); + // 1 file in L6, 1 file in L1 + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel(0)); + threads.join(); + + for (int32_t i = 0; i < 600; i++) { + ASSERT_EQ(Get(Key(i)), values[i]); + } +} + +TEST_F(DBCompactionTest, ManualPartialFill) { + int32_t trivial_move = 0; + int32_t non_trivial_move = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:TrivialMove", + [&](void* arg) { trivial_move++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial", + [&](void* arg) { non_trivial_move++; }); + bool first = true; + bool second = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBCompaction::PartialFill:4", "DBCompaction::PartialFill:1"}, + {"DBCompaction::PartialFill:2", "DBCompaction::PartialFill:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (first) { + TEST_SYNC_POINT("DBCompaction::PartialFill:4"); + first = false; + TEST_SYNC_POINT("DBCompaction::PartialFill:3"); + } else if (second) { + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Options options = CurrentOptions(); + options.write_buffer_size = 10 * 1024 * 1024; + options.max_bytes_for_level_multiplier = 2; + options.num_levels = 4; + options.level0_file_num_compaction_trigger = 3; + options.max_background_compactions = 3; + + DestroyAndReopen(options); + int32_t value_size = 10 * 1024; // 10 KB + + // Add 2 non-overlapping files + Random rnd(301); + std::map values; + + // file 1 [0 => 100] + for (int32_t i = 0; i < 100; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // file 2 [100 => 300] + for (int32_t i = 100; i < 300; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // 2 files in L0 + ASSERT_EQ("2", FilesPerLevel(0)); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 2; + ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + // 2 files in L2 + ASSERT_EQ("0,0,2", FilesPerLevel(0)); + + ASSERT_EQ(trivial_move, 1); + ASSERT_EQ(non_trivial_move, 0); + + // file 3 [ 0 => 200] + for (int32_t i = 0; i < 200; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // 2 files in L2, 1 in L0 + ASSERT_EQ("1,0,2", FilesPerLevel(0)); + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false)); + // 2 files in L2, 1 in L1 + ASSERT_EQ("0,1,2", FilesPerLevel(0)); + + ASSERT_EQ(trivial_move, 2); + ASSERT_EQ(non_trivial_move, 0); + + std::thread threads([&] { + compact_options.change_level = false; + compact_options.exclusive_manual_compaction = false; + std::string begin_string = Key(0); + std::string end_string = Key(199); + Slice begin(begin_string); + Slice end(end_string); + ASSERT_OK(db_->CompactRange(compact_options, &begin, &end)); + }); + + TEST_SYNC_POINT("DBCompaction::PartialFill:1"); + // Many files 4 [300 => 4300) + for (int32_t i = 0; i <= 5; i++) { + for (int32_t j = 300; j < 4300; j++) { + if (j == 2300) { + ASSERT_OK(Flush()); + dbfull()->TEST_WaitForFlushMemTable(); + } + values[j] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(j), values[j])); + } + } + + // Verify level sizes + uint64_t target_size = 4 * options.max_bytes_for_level_base; + for (int32_t i = 1; i < options.num_levels; i++) { + ASSERT_LE(SizeAtLevel(i), target_size); + target_size *= options.max_bytes_for_level_multiplier; + } + + TEST_SYNC_POINT("DBCompaction::PartialFill:2"); + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + threads.join(); + + for (int32_t i = 0; i < 4300; i++) { + ASSERT_EQ(Get(Key(i)), values[i]); + } +} + TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) { int32_t trivial_move = 0; int32_t non_trivial_move = 0; @@ -825,6 +1092,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) { CompactRangeOptions compact_options; compact_options.change_level = true; compact_options.target_level = 3; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); ASSERT_EQ("0,0,0,1", FilesPerLevel(0)); ASSERT_EQ(trivial_move, 1); @@ -838,8 +1106,10 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) { ASSERT_OK(Flush()); ASSERT_EQ("1,0,0,1", FilesPerLevel(0)); + CompactRangeOptions cro; + cro.exclusive_manual_compaction = exclusive_manual_compaction_; // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves) - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); ASSERT_EQ("0,0,0,2", FilesPerLevel(0)); ASSERT_EQ(trivial_move, 4); ASSERT_EQ(non_trivial_move, 0); @@ -1143,6 +1413,7 @@ TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) { compact_options.target_level = 0; compact_options.bottommost_level_compaction = BottommostLevelCompaction::kForce; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr); // Only 1 file in L0 @@ -1276,7 +1547,9 @@ TEST_P(DBCompactionTestWithParam, ManualCompaction) { uint64_t prev_block_cache_add = options.statistics->getTickerCount(BLOCK_CACHE_ADD); - db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); + CompactRangeOptions cro; + cro.exclusive_manual_compaction = exclusive_manual_compaction_; + db_->CompactRange(cro, handles_[1], nullptr, nullptr); // Verify manual compaction doesn't fill block cache ASSERT_EQ(prev_block_cache_add, options.statistics->getTickerCount(BLOCK_CACHE_ADD)); @@ -1355,6 +1628,7 @@ TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) { ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); CompactRangeOptions compact_options; compact_options.target_path_id = 1; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); ASSERT_EQ("0,1", FilesPerLevel(1)); @@ -1867,7 +2141,10 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) { } INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam, - ::testing::Values(1, 4)); + ::testing::Values(std::make_tuple(1, true), + std::make_tuple(1, false), + std::make_tuple(4, true), + std::make_tuple(4, false))); class CompactionPriTest : public DBTestBase, public testing::WithParamInterface { diff --git a/db/db_impl.cc b/db/db_impl.cc index 144210412..f6451f39d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -253,10 +253,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) unscheduled_compactions_(0), bg_compaction_scheduled_(0), num_running_compactions_(0), - bg_manual_only_(0), bg_flush_scheduled_(0), num_running_flushes_(0), - manual_compaction_(nullptr), disable_delete_obsolete_files_(0), delete_obsolete_files_next_run_( options.env->NowMicros() + @@ -1561,6 +1559,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); + bool exclusive = options.exclusive_manual_compaction; Status s = FlushMemTable(cfd, FlushOptions()); if (!s.ok()) { @@ -1586,7 +1585,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, // Always compact all files together. s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, cfd->NumberLevels() - 1, options.target_path_id, - begin, end); + begin, end, exclusive); final_output_level = cfd->NumberLevels() - 1; } else { for (int level = 0; level <= max_level_with_files; level++) { @@ -1621,7 +1620,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, } } s = RunManualCompaction(cfd, level, output_level, options.target_path_id, - begin, end); + begin, end, exclusive); if (!s.ok()) { break; } @@ -2205,12 +2204,15 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const { Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, int output_level, uint32_t output_path_id, const Slice* begin, const Slice* end, - bool disallow_trivial_move) { + bool exclusive, bool disallow_trivial_move) { assert(input_level == ColumnFamilyData::kCompactAllLevels || input_level >= 0); InternalKey begin_storage, end_storage; + CompactionArg* ca; + bool scheduled = false; + bool manual_conflict = false; ManualCompaction manual; manual.cfd = cfd; manual.input_level = input_level; @@ -2218,6 +2220,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, manual.output_path_id = output_path_id; manual.done = false; manual.in_progress = false; + manual.incomplete = false; + manual.exclusive = exclusive; manual.disallow_trivial_move = disallow_trivial_move; // For universal compaction, we enforce every manual compaction to compact // all files. @@ -2245,7 +2249,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, // jobs drops to zero. This is needed to ensure that this manual compaction // can compact any range of keys/files. // - // bg_manual_only_ is non-zero when at least one thread is inside + // HasPendingManualCompaction() is true when at least one thread is inside // RunManualCompaction(), i.e. during that time no other compaction will // get scheduled (see MaybeScheduleFlushOrCompaction). // @@ -2254,13 +2258,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, // However, only one of them will actually schedule compaction, while // others will wait on a condition variable until it completes. - ++bg_manual_only_; - while (bg_compaction_scheduled_ > 0) { - Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "[%s] Manual compaction waiting for all other scheduled background " - "compactions to finish", - cfd->GetName().c_str()); - bg_cv_.Wait(); + AddManualCompaction(&manual); + TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_); + if (exclusive) { + while (bg_compaction_scheduled_ > 0) { + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[%s] Manual compaction waiting for all other scheduled background " + "compactions to finish", + cfd->GetName().c_str()); + bg_cv_.Wait(); + } } Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, @@ -2271,20 +2278,47 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, // the compaction will set manual.status to bg_error_ and set manual.done to // true. while (!manual.done) { - assert(bg_manual_only_ > 0); - if (manual_compaction_ != nullptr) { + assert(HasPendingManualCompaction()); + manual_conflict = false; + if (ShouldRunManualCompaction(&manual) || (manual.in_progress == true) || + scheduled || + ((manual.manual_end = &manual.tmp_storage1)&&( + (manual.compaction = manual.cfd->CompactRange( + *manual.cfd->GetLatestMutableCFOptions(), manual.input_level, + manual.output_level, manual.output_path_id, manual.begin, + manual.end, &manual.manual_end, &manual_conflict)) == + nullptr) && + manual_conflict)) { + // exclusive manual compactions should not see a conflict during + // CompactRange + assert(!exclusive || !manual_conflict); // Running either this or some other manual compaction bg_cv_.Wait(); - } else { - manual_compaction_ = &manual; + if (scheduled && manual.incomplete == true) { + assert(!manual.in_progress); + scheduled = false; + manual.incomplete = false; + } + } else if (!scheduled) { + if (manual.compaction == nullptr) { + manual.done = true; + bg_cv_.SignalAll(); + continue; + } + ca = new CompactionArg; + ca->db = this; + ca->m = &manual; + manual.incomplete = false; bg_compaction_scheduled_++; - env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW, this); + env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, + &DBImpl::UnscheduleCallback); + scheduled = true; } } assert(!manual.in_progress); - assert(bg_manual_only_ > 0); - --bg_manual_only_; + assert(HasPendingManualCompaction()); + RemoveManualCompaction(&manual); return manual.status; } @@ -2406,7 +2440,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } } - if (bg_manual_only_) { + if (HasExclusiveManualCompaction()) { // only manual compactions are allowed to run. don't schedule automatic // compactions return; @@ -2414,9 +2448,13 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { while (bg_compaction_scheduled_ < db_options_.max_background_compactions && unscheduled_compactions_ > 0) { + CompactionArg* ca = new CompactionArg; + ca->db = this; + ca->m = nullptr; bg_compaction_scheduled_++; unscheduled_compactions_--; - env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW, this); + env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, + &DBImpl::UnscheduleCallback); } } @@ -2478,10 +2516,21 @@ void DBImpl::BGWorkFlush(void* db) { TEST_SYNC_POINT("DBImpl::BGWorkFlush:done"); } -void DBImpl::BGWorkCompaction(void* db) { +void DBImpl::BGWorkCompaction(void* arg) { + CompactionArg ca = *(reinterpret_cast(arg)); + delete reinterpret_cast(arg); IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW); TEST_SYNC_POINT("DBImpl::BGWorkCompaction"); - reinterpret_cast(db)->BackgroundCallCompaction(); + reinterpret_cast(ca.db)->BackgroundCallCompaction(ca.m); +} + +void DBImpl::UnscheduleCallback(void* arg) { + CompactionArg ca = *(reinterpret_cast(arg)); + delete reinterpret_cast(arg); + if ((ca.m != nullptr) && (ca.m->compaction != nullptr)) { + delete ca.m->compaction; + } + TEST_SYNC_POINT("DBImpl::UnscheduleCallback"); } Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, @@ -2602,8 +2651,9 @@ void DBImpl::BackgroundCallFlush() { } } -void DBImpl::BackgroundCallCompaction() { +void DBImpl::BackgroundCallCompaction(void* arg) { bool made_progress = false; + ManualCompaction* m = reinterpret_cast(arg); JobContext job_context(next_job_id_.fetch_add(1), true); TEST_SYNC_POINT("BackgroundCallCompaction:0"); MaybeDumpStats(); @@ -2616,7 +2666,8 @@ void DBImpl::BackgroundCallCompaction() { CaptureCurrentFileNumberInPendingOutputs(); assert(bg_compaction_scheduled_); - Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer); + Status s = + BackgroundCompaction(&made_progress, &job_context, &log_buffer, m); TEST_SYNC_POINT("BackgroundCallCompaction:1"); if (!s.ok() && !s.IsShutdownInProgress()) { // Wait a little bit before retrying background compaction in @@ -2668,11 +2719,12 @@ void DBImpl::BackgroundCallCompaction() { // See if there's more work to be done MaybeScheduleFlushOrCompaction(); - if (made_progress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) { + if (made_progress || bg_compaction_scheduled_ == 0 || + HasPendingManualCompaction()) { // signal if // * made_progress -- need to wakeup DelayWrite // * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl - // * bg_manual_only_ > 0 -- need to wakeup RunManualCompaction + // * HasPendingManualCompaction -- need to wakeup RunManualCompaction // If none of this is true, there is no need to signal since nobody is // waiting for it bg_cv_.SignalAll(); @@ -2686,14 +2738,17 @@ void DBImpl::BackgroundCallCompaction() { Status DBImpl::BackgroundCompaction(bool* made_progress, JobContext* job_context, - LogBuffer* log_buffer) { + LogBuffer* log_buffer, void* arg) { + ManualCompaction* manual_compaction = + reinterpret_cast(arg); *made_progress = false; mutex_.AssertHeld(); - bool is_manual = (manual_compaction_ != nullptr) && - (manual_compaction_->in_progress == false); - bool trivial_move_disallowed = is_manual && - manual_compaction_->disallow_trivial_move; + bool is_manual = (manual_compaction != nullptr); + + // (manual_compaction->in_progress == false); + bool trivial_move_disallowed = + is_manual && manual_compaction->disallow_trivial_move; CompactionJobStats compaction_job_stats; Status status = bg_error_; @@ -2703,34 +2758,30 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (!status.ok()) { if (is_manual) { - manual_compaction_->status = status; - manual_compaction_->done = true; - manual_compaction_->in_progress = false; - manual_compaction_ = nullptr; + manual_compaction->status = status; + manual_compaction->done = true; + manual_compaction->in_progress = false; + delete manual_compaction->compaction; + manual_compaction = nullptr; } return status; } if (is_manual) { // another thread cannot pick up the same work - manual_compaction_->in_progress = true; - } else if (manual_compaction_ != nullptr) { - // there should be no automatic compactions running when manual compaction - // is running - return Status::OK(); + manual_compaction->in_progress = true; } unique_ptr c; - InternalKey manual_end_storage; - InternalKey* manual_end = &manual_end_storage; + // InternalKey manual_end_storage; + // InternalKey* manual_end = &manual_end_storage; if (is_manual) { - ManualCompaction* m = manual_compaction_; + ManualCompaction* m = manual_compaction; assert(m->in_progress); - c.reset(m->cfd->CompactRange( - *m->cfd->GetLatestMutableCFOptions(), m->input_level, m->output_level, - m->output_path_id, m->begin, m->end, &manual_end)); + c.reset(std::move(m->compaction)); if (!c) { m->done = true; + m->manual_end = nullptr; LogToBuffer(log_buffer, "[%s] Manual compaction from level-%d from %s .. " "%s; nothing to do\n", @@ -2744,9 +2795,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, m->cfd->GetName().c_str(), m->input_level, c->output_level(), (m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->end ? m->end->DebugString().c_str() : "(end)"), - ((m->done || manual_end == nullptr) + ((m->done || m->manual_end == nullptr) ? "(end)" - : manual_end->DebugString().c_str())); + : m->manual_end->DebugString().c_str())); } } else if (!compaction_queue_.empty()) { // cfd is referenced here @@ -2763,6 +2814,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, return Status::OK(); } + if (HaveManualCompaction(cfd)) { + // Can't compact right now, but try again later + TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict"); + return Status::OK(); + } + // Pick up latest mutable CF Options and use it throughout the // compaction job // Compaction makes a copy of the latest MutableCFOptions. It should be used @@ -2940,7 +2997,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } if (is_manual) { - ManualCompaction* m = manual_compaction_; + ManualCompaction* m = manual_compaction; if (!status.ok()) { m->status = status; m->done = true; @@ -2958,7 +3015,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // Stop the compaction if manual_end points to nullptr -- this means // that we compacted the whole range. manual_end should always point // to nullptr in case of universal compaction - if (manual_end == nullptr) { + if (m->manual_end == nullptr) { m->done = true; } if (!m->done) { @@ -2969,15 +3026,102 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, kCompactionStyleUniversal || m->cfd->ioptions()->num_levels > 1); assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO); - m->tmp_storage = *manual_end; + m->tmp_storage = *m->manual_end; m->begin = &m->tmp_storage; + m->incomplete = true; } m->in_progress = false; // not being processed anymore - manual_compaction_ = nullptr; } return status; } +bool DBImpl::HasPendingManualCompaction() { + return (!manual_compaction_dequeue_.empty()); +} + +void DBImpl::AddManualCompaction(DBImpl::ManualCompaction* m) { + manual_compaction_dequeue_.push_back(m); +} + +void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) { + // Remove from queue + std::deque::iterator it = + manual_compaction_dequeue_.begin(); + while (it != manual_compaction_dequeue_.end()) { + if (m == (*it)) { + it = manual_compaction_dequeue_.erase(it); + return; + } + it++; + } + assert(false); + return; +} + +bool DBImpl::ShouldRunManualCompaction(ManualCompaction* m) { + if ((m->exclusive) && (bg_compaction_scheduled_ > 0)) { + return true; + } + std::deque::iterator it = + manual_compaction_dequeue_.begin(); + bool seen = false; + while (it != manual_compaction_dequeue_.end()) { + if (m == (*it)) { + it++; + seen = true; + continue; + } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) { + // Consider the other manual compaction *it, conflicts if: + // overlaps with m + // and (*it) is ahead in the queue and is not yet in progress + return true; + } + it++; + } + return false; +} + +bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) { + // Remove from priority queue + std::deque::iterator it = + manual_compaction_dequeue_.begin(); + while (it != manual_compaction_dequeue_.end()) { + if ((*it)->exclusive) { + return true; + } + if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) { + // Allow automatic compaction if manual compaction is + // is in progress + return true; + } + it++; + } + return false; +} + +bool DBImpl::HasExclusiveManualCompaction() { + // Remove from priority queue + std::deque::iterator it = + manual_compaction_dequeue_.begin(); + while (it != manual_compaction_dequeue_.end()) { + if ((*it)->exclusive) { + return true; + } + it++; + } + return false; +} + +bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) { + if ((m->exclusive) || (m1->exclusive)) { + return true; + } + if (m->cfd != m1->cfd) { + return false; + } + return true; +} + namespace { struct IterState { IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version) diff --git a/db/db_impl.h b/db/db_impl.h index 0f44b378a..ff6d015c8 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -275,6 +275,7 @@ class DBImpl : public DB { Status RunManualCompaction(ColumnFamilyData* cfd, int input_level, int output_level, uint32_t output_path_id, const Slice* begin, const Slice* end, + bool exclusive, bool disallow_trivial_move = false); // Return an internal iterator over the current state of the database. @@ -554,12 +555,13 @@ class DBImpl : public DB { void MaybeScheduleFlushOrCompaction(); void SchedulePendingFlush(ColumnFamilyData* cfd); void SchedulePendingCompaction(ColumnFamilyData* cfd); - static void BGWorkCompaction(void* db); + static void BGWorkCompaction(void* arg); static void BGWorkFlush(void* db); - void BackgroundCallCompaction(); + static void UnscheduleCallback(void* arg); + void BackgroundCallCompaction(void* arg); void BackgroundCallFlush(); Status BackgroundCompaction(bool* madeProgress, JobContext* job_context, - LogBuffer* log_buffer); + LogBuffer* log_buffer, void* m = 0); Status BackgroundFlush(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer); @@ -605,7 +607,7 @@ class DBImpl : public DB { std::atomic shutting_down_; // This condition variable is signaled on these conditions: // * whenever bg_compaction_scheduled_ goes down to 0 - // * if bg_manual_only_ > 0, whenever a compaction finishes, even if it hasn't + // * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't // made any progress // * whenever a compaction made any progress // * whenever bg_flush_scheduled_ value decreases (i.e. whenever a flush is @@ -763,11 +765,6 @@ class DBImpl : public DB { // stores the number of compactions are currently running int num_running_compactions_; - // If non-zero, MaybeScheduleFlushOrCompaction() will only schedule manual - // compactions (if manual_compaction_ is not null). This mechanism enables - // manual compactions to wait until all other compactions are finished. - int bg_manual_only_; - // number of background memtable flush jobs, submitted to the HIGH pool int bg_flush_scheduled_; @@ -780,15 +777,25 @@ class DBImpl : public DB { int input_level; int output_level; uint32_t output_path_id; - bool done; Status status; + bool done; bool in_progress; // compaction request being processed? + bool incomplete; // only part of requested range compacted + bool exclusive; // current behavior of only one manual + bool disallow_trivial_move; // Force actual compaction to run const InternalKey* begin; // nullptr means beginning of key range const InternalKey* end; // nullptr means end of key range + InternalKey* manual_end; // how far we are compacting InternalKey tmp_storage; // Used to keep track of compaction progress - bool disallow_trivial_move; // Force actual compaction to run + InternalKey tmp_storage1; // Used to keep track of compaction progress + Compaction* compaction; + }; + std::deque manual_compaction_dequeue_; + + struct CompactionArg { + DBImpl* db; + ManualCompaction* m; }; - ManualCompaction* manual_compaction_; // Have we encountered a background error in paranoid mode? Status bg_error_; @@ -885,6 +892,14 @@ class DBImpl : public DB { DBPropertyType property_type, bool need_out_of_mutex, bool is_locked, uint64_t* value); + + bool HasPendingManualCompaction(); + bool HasExclusiveManualCompaction(); + void AddManualCompaction(ManualCompaction* m); + void RemoveManualCompaction(ManualCompaction* m); + bool ShouldRunManualCompaction(ManualCompaction* m); + bool HaveManualCompaction(ColumnFamilyData* cfd); + bool MCOverlap(ManualCompaction* m, ManualCompaction* m1); }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index ef8a12d10..e494c4ee5 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -70,7 +70,7 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin, cfd->ioptions()->compaction_style == kCompactionStyleFIFO) ? level : level + 1; - return RunManualCompaction(cfd, level, output_level, 0, begin, end, + return RunManualCompaction(cfd, level, output_level, 0, begin, end, true, disallow_trivial_move); } diff --git a/db/db_test.cc b/db/db_test.cc index 8364f64cb..9c8c4fbef 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -102,16 +102,21 @@ class DBTest : public DBTestBase { DBTest() : DBTestBase("/db_test") {} }; -class DBTestWithParam : public DBTest, - public testing::WithParamInterface { +class DBTestWithParam + : public DBTest, + public testing::WithParamInterface> { public: - DBTestWithParam() { max_subcompactions_ = GetParam(); } + DBTestWithParam() { + max_subcompactions_ = std::get<0>(GetParam()); + exclusive_manual_compaction_ = std::get<1>(GetParam()); + } // Required if inheriting from testing::WithParamInterface<> static void SetUpTestCase() {} static void TearDownTestCase() {} uint32_t max_subcompactions_; + bool exclusive_manual_compaction_; }; #ifndef ROCKSDB_LITE @@ -6389,7 +6394,9 @@ TEST_P(DBTestWithParam, FIFOCompactionTest) { if (iter == 0) { ASSERT_OK(dbfull()->TEST_WaitForCompact()); } else { - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + CompactRangeOptions cro; + cro.exclusive_manual_compaction = exclusive_manual_compaction_; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); } // only 5 files should survive ASSERT_EQ(NumTableFilesAtLevel(0), 5); @@ -8397,7 +8404,9 @@ TEST_P(DBTestWithParam, FilterCompactionTimeTest) { Flush(); } - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + CompactRangeOptions cro; + cro.exclusive_manual_compaction = exclusive_manual_compaction_; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); ASSERT_EQ(0U, CountLiveFiles()); Reopen(options); @@ -8802,6 +8811,36 @@ TEST_F(DBTest, HugeNumberOfLevels) { ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); } +TEST_F(DBTest, AutomaticConflictsWithManualCompaction) { + Options options = CurrentOptions(); + options.write_buffer_size = 2 * 1024 * 1024; // 2MB + options.max_bytes_for_level_base = 2 * 1024 * 1024; // 2MB + options.num_levels = 12; + options.max_background_compactions = 10; + options.max_bytes_for_level_multiplier = 2; + options.level_compaction_dynamic_level_bytes = true; + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 300000; ++i) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); + } + + std::atomic callback_count(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction()::Conflict", + [&](void* arg) { callback_count.fetch_add(1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + CompactRangeOptions croptions; + croptions.exclusive_manual_compaction = false; + ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr)); + ASSERT_GE(callback_count.load(), 1); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + for (int i = 0; i < 300000; ++i) { + ASSERT_NE("NOT_FOUND", Get(Key(i))); + } +} + // Github issue #595 // Large write batch with column families TEST_F(DBTest, LargeBatchWithColumnFamilies) { @@ -10105,7 +10144,8 @@ TEST_F(DBTest, SSTsWithLdbSuffixHandling) { } INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam, - ::testing::Values(1, 4)); + ::testing::Combine(::testing::Values(1, 4), + ::testing::Bool())); TEST_F(DBTest, PauseBackgroundWorkTest) { Options options; diff --git a/db/db_universal_compaction_test.cc b/db/db_universal_compaction_test.cc index 15f871b14..9efcf4ae5 100644 --- a/db/db_universal_compaction_test.cc +++ b/db/db_universal_compaction_test.cc @@ -22,12 +22,16 @@ static std::string CompressibleString(Random* rnd, int len) { class DBTestUniversalCompactionBase : public DBTestBase, - public ::testing::WithParamInterface { + public ::testing::WithParamInterface> { public: explicit DBTestUniversalCompactionBase( const std::string& path) : DBTestBase(path) {} - virtual void SetUp() override { num_levels_ = GetParam(); } + virtual void SetUp() override { + num_levels_ = std::get<0>(GetParam()); + exclusive_manual_compaction_ = std::get<1>(GetParam()); + } int num_levels_; + bool exclusive_manual_compaction_; }; class DBTestUniversalCompaction : public DBTestUniversalCompactionBase { @@ -406,6 +410,7 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionTargetLevel) { CompactRangeOptions compact_options; compact_options.change_level = true; compact_options.target_level = 4; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; db_->CompactRange(compact_options, nullptr, nullptr); ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0)); } @@ -498,7 +503,8 @@ TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) { INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionMultiLevels, DBTestUniversalCompactionMultiLevels, - ::testing::Values(3, 20)); + ::testing::Combine(::testing::Values(3, 20), + ::testing::Bool())); class DBTestUniversalCompactionParallel : public DBTestUniversalCompactionBase { @@ -571,7 +577,8 @@ TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) { INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionParallel, DBTestUniversalCompactionParallel, - ::testing::Values(1, 10)); + ::testing::Combine(::testing::Values(1, 10), + ::testing::Bool())); TEST_P(DBTestUniversalCompaction, UniversalCompactionOptions) { Options options; @@ -1063,6 +1070,7 @@ TEST_P(DBTestUniversalCompaction, IncreaseUniversalCompactionNumLevels) { CompactRangeOptions compact_options; compact_options.change_level = true; compact_options.target_level = 0; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr); // Need to restart it once to remove higher level records in manifest. ReopenWithColumnFamilies({"default", "pikachu"}, options); @@ -1186,7 +1194,8 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) { } INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction, - ::testing::Values(1, 3, 5)); + ::testing::Combine(::testing::Values(1, 3, 5), + ::testing::Bool())); class DBTestUniversalManualCompactionOutputPathId : public DBTestUniversalCompactionBase { @@ -1218,6 +1227,7 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId, // Full compaction to DB path 0 CompactRangeOptions compact_options; compact_options.target_path_id = 1; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); @@ -1240,6 +1250,7 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId, // Full compaction to DB path 0 compact_options.target_path_id = 0; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); @@ -1247,13 +1258,15 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId, // Fail when compacting to an invalid path ID compact_options.target_path_id = 2; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr) .IsInvalidArgument()); } INSTANTIATE_TEST_CASE_P(DBTestUniversalManualCompactionOutputPathId, DBTestUniversalManualCompactionOutputPathId, - ::testing::Values(1, 8)); + ::testing::Combine(::testing::Values(1, 8), + ::testing::Bool())); } // namespace rocksdb diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index e1e943093..6303cb7af 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -322,7 +322,8 @@ class HdfsEnv : public Env { } virtual void Schedule(void (*function)(void* arg), void* arg, - Priority pri = LOW, void* tag = nullptr) override {} + Priority pri = LOW, void* tag = nullptr, + void (*unschedFunction)(void* arg) = 0) override {} virtual int UnSchedule(void* tag, Priority pri) override { return 0; } diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index ebaeee6cc..74e2dca92 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -245,7 +245,8 @@ class Env { // I.e., the caller may not assume that background work items are // serialized. virtual void Schedule(void (*function)(void* arg), void* arg, - Priority pri = LOW, void* tag = nullptr) = 0; + Priority pri = LOW, void* tag = nullptr, + void (*unschedFunction)(void* arg) = 0) = 0; // Arrange to remove jobs for given arg from the queue_ if they are not // already scheduled. Caller is expected to have exclusive lock on arg. @@ -822,8 +823,8 @@ class EnvWrapper : public Env { Status UnlockFile(FileLock* l) override { return target_->UnlockFile(l); } void Schedule(void (*f)(void* arg), void* a, Priority pri, - void* tag = nullptr) override { - return target_->Schedule(f, a, pri, tag); + void* tag = nullptr, void (*u)(void* arg) = 0) override { + return target_->Schedule(f, a, pri, tag, u); } int UnSchedule(void* tag, Priority pri) override { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 04f9d7d8f..97457925d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1422,6 +1422,9 @@ enum class BottommostLevelCompaction { // CompactRangeOptions is used by CompactRange() call. struct CompactRangeOptions { + // If true, no other compaction will run at the same time as this + // manual compaction + bool exclusive_manual_compaction = true; // If true, compacted files will be moved to the minimum level capable // of holding the data or given level (specified non-negative target_level). bool change_level = false; diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 86f0e085e..54ad388d8 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -1549,7 +1549,8 @@ class WinEnv : public Env { } virtual void Schedule(void (*function)(void*), void* arg, Priority pri = LOW, - void* tag = nullptr) override; + void* tag = nullptr, + void (*unschedFunction)(void* arg) = 0) override; virtual int UnSchedule(void* arg, Priority pri) override; @@ -1978,7 +1979,8 @@ class WinEnv : public Env { } } - void Schedule(void (*function)(void* arg1), void* arg, void* tag) { + void Schedule(void (*function)(void* arg1), void* arg, void* tag, + void (*unschedFunction)(void* arg)) { std::lock_guard lg(mu_); if (exit_all_threads_) { @@ -1992,6 +1994,7 @@ class WinEnv : public Env { queue_.back().function = function; queue_.back().arg = arg; queue_.back().tag = tag; + queue_.back().unschedFunction = unschedFunction; queue_len_.store(queue_.size(), std::memory_order_relaxed); if (!HasExcessiveThread()) { @@ -2013,6 +2016,11 @@ class WinEnv : public Env { BGQueue::iterator it = queue_.begin(); while (it != queue_.end()) { if (arg == (*it).tag) { + void (*unschedFunction)(void*) = (*it).unschedFunction; + void* arg1 = (*it).arg; + if (unschedFunction != nullptr) { + (*unschedFunction)(arg1); + } it = queue_.erase(it); count++; } else { @@ -2036,6 +2044,7 @@ class WinEnv : public Env { void* arg; void (*function)(void*); void* tag; + void (*unschedFunction)(void*); }; typedef std::deque BGQueue; @@ -2094,9 +2103,9 @@ WinEnv::WinEnv() } void WinEnv::Schedule(void (*function)(void*), void* arg, Priority pri, - void* tag) { + void* tag, void (*unschedFunction)(void* arg)) { assert(pri >= Priority::LOW && pri <= Priority::HIGH); - thread_pools_[pri].Schedule(function, arg, tag); + thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); } int WinEnv::UnSchedule(void* arg, Priority pri) { diff --git a/util/env_posix.cc b/util/env_posix.cc index 8b44a5b11..9d549b44d 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -443,7 +443,8 @@ class PosixEnv : public Env { } virtual void Schedule(void (*function)(void* arg1), void* arg, - Priority pri = LOW, void* tag = nullptr) override; + Priority pri = LOW, void* tag = nullptr, + void (*unschedFunction)(void* arg) = 0) override; virtual int UnSchedule(void* arg, Priority pri) override; @@ -689,9 +690,9 @@ PosixEnv::PosixEnv() } void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri, - void* tag) { + void* tag, void (*unschedFunction)(void* arg)) { assert(pri >= Priority::LOW && pri <= Priority::HIGH); - thread_pools_[pri].Schedule(function, arg, tag); + thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); } int PosixEnv::UnSchedule(void* arg, Priority pri) { diff --git a/util/thread_posix.cc b/util/thread_posix.cc index c8c07e2a2..88e67ed76 100644 --- a/util/thread_posix.cc +++ b/util/thread_posix.cc @@ -7,8 +7,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include #include "util/thread_posix.h" +#include #include #ifdef OS_LINUX #include @@ -197,7 +197,8 @@ void ThreadPool::StartBGThreads() { } } -void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag) { +void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag, + void (*unschedFunction)(void* arg)) { PthreadCall("lock", pthread_mutex_lock(&mu_)); if (exit_all_threads_) { @@ -212,6 +213,7 @@ void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag) { queue_.back().function = function; queue_.back().arg = arg; queue_.back().tag = tag; + queue_.back().unschedFunction = unschedFunction; queue_len_.store(static_cast(queue_.size()), std::memory_order_relaxed); @@ -235,6 +237,11 @@ int ThreadPool::UnSchedule(void* arg) { BGQueue::iterator it = queue_.begin(); while (it != queue_.end()) { if (arg == (*it).tag) { + void (*unschedFunction)(void*) = (*it).unschedFunction; + void* arg1 = (*it).arg; + if (unschedFunction != nullptr) { + (*unschedFunction)(arg1); + } it = queue_.erase(it); count++; } else { diff --git a/util/thread_posix.h b/util/thread_posix.h index 28db0d7e6..c5d643878 100644 --- a/util/thread_posix.h +++ b/util/thread_posix.h @@ -24,7 +24,8 @@ class ThreadPool { void IncBackgroundThreadsIfNeeded(int num); void SetBackgroundThreads(int num); void StartBGThreads(); - void Schedule(void (*function)(void* arg1), void* arg, void* tag); + void Schedule(void (*function)(void* arg1), void* arg, void* tag, + void (*unschedFunction)(void* arg)); int UnSchedule(void* arg); unsigned int GetQueueLen() const { @@ -66,6 +67,7 @@ class ThreadPool { void* arg; void (*function)(void*); void* tag; + void (*unschedFunction)(void*); }; typedef std::deque BGQueue; diff --git a/utilities/flashcache/flashcache.cc b/utilities/flashcache/flashcache.cc index a1a035244..886d449e8 100644 --- a/utilities/flashcache/flashcache.cc +++ b/utilities/flashcache/flashcache.cc @@ -3,10 +3,10 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -#include "rocksdb/utilities/flashcache.h" - #include "utilities/flashcache/flashcache.h" +#include "rocksdb/utilities/flashcache.h" + #ifdef OS_LINUX #include #include @@ -91,7 +91,7 @@ class FlashcacheAwareEnv : public EnvWrapper { } void Schedule(void (*f)(void* arg), void* a, Priority pri, - void* tag = nullptr) override { + void* tag = nullptr, void (*u)(void* arg) = 0) override { EnvWrapper::Schedule(&BgThreadWrapper, new Arg(f, a, cachedev_fd_), pri, tag); }