From 030215bf01e2d239f8136212e261f00dcdcbdb16 Mon Sep 17 00:00:00 2001 From: Venkatesh Radhakrishnan Date: Mon, 14 Dec 2015 11:20:34 -0800 Subject: [PATCH] Running manual compactions in parallel with other automatic or manual compactions in restricted cases Summary: This diff provides a framework for doing manual compactions in parallel with other compactions. We now have a deque of manual compactions. We also pass manual compactions as an argument from RunManualCompactions down to BackgroundCompactions, so that RunManualCompactions can be reentrant. Parallelism is controlled by the two routines ConflictingManualCompaction to allow/disallow new parallel/manual compactions based on already existing ManualCompactions. In this diff, by default manual compactions still have to run exclusive of other compactions. However, by setting the compaction option, exclusive_manual_compaction to false, it is possible to run other compactions in parallel with a manual compaction. However, we are still restricted to one manual compaction per column family at a time. All of these restrictions will be relaxed in future diffs. I will be adding more tests later. Test Plan: Rocksdb regression + new tests + valgrind Reviewers: igor, anthony, IslamAbdelRahman, kradhakrishnan, yhchiang, sdong Reviewed By: sdong Subscribers: yoshinorim, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D47973 --- db/column_family.cc | 9 +- db/column_family.h | 10 +- db/column_family_test.cc | 689 ++++++++++++++++++++++++++++- db/compaction_picker.cc | 47 +- db/compaction_picker.h | 16 +- db/db_compaction_test.cc | 295 +++++++++++- db/db_impl.cc | 252 ++++++++--- db/db_impl.h | 39 +- db/db_impl_debug.cc | 2 +- db/db_test.cc | 52 ++- db/db_universal_compaction_test.cc | 25 +- hdfs/env_hdfs.h | 3 +- include/rocksdb/env.h | 7 +- include/rocksdb/options.h | 3 + port/win/env_win.cc | 17 +- util/env_posix.cc | 7 +- util/thread_posix.cc | 11 +- util/thread_posix.h | 4 +- utilities/flashcache/flashcache.cc | 6 +- 19 files changed, 1362 insertions(+), 132 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index a6fae98f3..b2d0a1b0e 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -546,13 +546,12 @@ const int ColumnFamilyData::kCompactAllLevels = -1; const int ColumnFamilyData::kCompactToBaseLevel = -2; Compaction* ColumnFamilyData::CompactRange( - const MutableCFOptions& mutable_cf_options, - int input_level, int output_level, uint32_t output_path_id, - const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end) { + const MutableCFOptions& mutable_cf_options, int input_level, + int output_level, uint32_t output_path_id, const InternalKey* begin, + const InternalKey* end, InternalKey** compaction_end, bool* conflict) { auto* result = compaction_picker_->CompactRange( GetName(), mutable_cf_options, current_->storage_info(), input_level, - output_level, output_path_id, begin, end, compaction_end); + output_level, output_path_id, begin, end, compaction_end, conflict); if (result != nullptr) { result->SetInputVersion(current_); } diff --git a/db/column_family.h b/db/column_family.h index e44873c7a..40a6d6910 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -249,11 +249,11 @@ class ColumnFamilyData { // A flag to tell a manual compaction's output is base level. static const int kCompactToBaseLevel; // REQUIRES: DB mutex held - Compaction* CompactRange( - const MutableCFOptions& mutable_cf_options, - int input_level, int output_level, uint32_t output_path_id, - const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end); + Compaction* CompactRange(const MutableCFOptions& mutable_cf_options, + int input_level, int output_level, + uint32_t output_path_id, const InternalKey* begin, + const InternalKey* end, InternalKey** compaction_end, + bool* manual_conflict); CompactionPicker* compaction_picker() { return compaction_picker_.get(); } // thread-safe diff --git a/db/column_family_test.cc b/db/column_family_test.cc index b877ea8a1..f36366a85 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -184,11 +184,17 @@ class ColumnFamilyTest : public testing::Test { } } - void PutRandomData(int cf, int num, int key_value_size) { + void PutRandomData(int cf, int num, int key_value_size, bool save = false) { for (int i = 0; i < num; ++i) { // 10 bytes for key, rest is value - ASSERT_OK(Put(cf, test::RandomKey(&rnd_, 10), - RandomString(&rnd_, key_value_size - 10))); + if (!save) { + ASSERT_OK(Put(cf, test::RandomKey(&rnd_, 11), + RandomString(&rnd_, key_value_size - 10))); + } else { + std::string key = test::RandomKey(&rnd_, 11); + keys_.insert(key); + ASSERT_OK(Put(cf, key, RandomString(&rnd_, key_value_size - 10))); + } } } @@ -378,6 +384,7 @@ class ColumnFamilyTest : public testing::Test { std::vector handles_; std::vector names_; + std::set keys_; ColumnFamilyOptions column_family_options_; DBOptions db_options_; std::string dbname_; @@ -921,6 +928,682 @@ TEST_F(ColumnFamilyTest, DifferentCompactionStyles) { Close(); } +#ifndef ROCKSDB_LITE +// Sync points not supported in RocksDB Lite + +TEST_F(ColumnFamilyTest, MultipleManualCompactions) { + Open(); + CreateColumnFamilies({"one", "two"}); + ColumnFamilyOptions default_cf, one, two; + db_options_.max_open_files = 20; // only 10 files in file cache + db_options_.disableDataSync = true; + db_options_.max_background_compactions = 3; + + default_cf.compaction_style = kCompactionStyleLevel; + default_cf.num_levels = 3; + default_cf.write_buffer_size = 64 << 10; // 64KB + default_cf.target_file_size_base = 30 << 10; + default_cf.source_compaction_factor = 100; + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + one.compaction_style = kCompactionStyleUniversal; + + one.num_levels = 1; + // trigger compaction if there are >= 4 files + one.level0_file_num_compaction_trigger = 4; + one.write_buffer_size = 120000; + + two.compaction_style = kCompactionStyleLevel; + two.num_levels = 4; + two.level0_file_num_compaction_trigger = 3; + two.write_buffer_size = 100000; + + Reopen({default_cf, one, two}); + + // SETUP column family "one" -- universal style + for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(i + 1), 1); + } + bool cf_1_1 = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"ColumnFamilyTest::MultiManual:4", "ColumnFamilyTest::MultiManual:1"}, + {"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:5"}, + {"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (cf_1_1) { + TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:4"); + cf_1_1 = false; + TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:3"); + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + std::vector threads; + threads.emplace_back([&] { + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK( + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + }); + + // SETUP column family "two" -- level style with 4 levels + for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(2, 10, 12000); + PutRandomData(2, 1, 10); + WaitForFlush(2); + AssertFilesPerLevel(ToString(i + 1), 2); + } + threads.emplace_back([&] { + TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:1"); + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK( + db_->CompactRange(compact_options, handles_[2], nullptr, nullptr)); + TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:2"); + }); + + TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:5"); + for (auto& t : threads) { + t.join(); + } + + // VERIFY compaction "one" + AssertFilesPerLevel("1", 1); + + // VERIFY compaction "two" + AssertFilesPerLevel("0,1", 2); + CompactAll(2); + AssertFilesPerLevel("0,1", 2); + // Compare against saved keys + std::set::iterator key_iter = keys_.begin(); + while (key_iter != keys_.end()) { + ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); + key_iter++; + } + Close(); +} + +TEST_F(ColumnFamilyTest, AutomaticAndManualCompactions) { + Open(); + CreateColumnFamilies({"one", "two"}); + ColumnFamilyOptions default_cf, one, two; + db_options_.max_open_files = 20; // only 10 files in file cache + db_options_.disableDataSync = true; + db_options_.max_background_compactions = 3; + + default_cf.compaction_style = kCompactionStyleLevel; + default_cf.num_levels = 3; + default_cf.write_buffer_size = 64 << 10; // 64KB + default_cf.target_file_size_base = 30 << 10; + default_cf.source_compaction_factor = 100; + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + one.compaction_style = kCompactionStyleUniversal; + + one.num_levels = 1; + // trigger compaction if there are >= 4 files + one.level0_file_num_compaction_trigger = 4; + one.write_buffer_size = 120000; + + two.compaction_style = kCompactionStyleLevel; + two.num_levels = 4; + two.level0_file_num_compaction_trigger = 3; + two.write_buffer_size = 100000; + + Reopen({default_cf, one, two}); + + bool cf_1_1 = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:1"}, + {"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:5"}, + {"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (cf_1_1) { + cf_1_1 = false; + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4"); + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3"); + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + // SETUP column family "one" -- universal style + for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(i + 1), 1); + } + + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1"); + + // SETUP column family "two" -- level style with 4 levels + for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(2, 10, 12000); + PutRandomData(2, 1, 10); + WaitForFlush(2); + AssertFilesPerLevel(ToString(i + 1), 2); + } + std::thread threads([&] { + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK( + db_->CompactRange(compact_options, handles_[2], nullptr, nullptr)); + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2"); + }); + + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5"); + threads.join(); + + // WAIT for compactions + WaitForCompaction(); + + // VERIFY compaction "one" + AssertFilesPerLevel("1", 1); + + // VERIFY compaction "two" + AssertFilesPerLevel("0,1", 2); + CompactAll(2); + AssertFilesPerLevel("0,1", 2); + // Compare against saved keys + std::set::iterator key_iter = keys_.begin(); + while (key_iter != keys_.end()) { + ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); + key_iter++; + } + Close(); +} + +TEST_F(ColumnFamilyTest, ManualAndAutomaticCompactions) { + Open(); + CreateColumnFamilies({"one", "two"}); + ColumnFamilyOptions default_cf, one, two; + db_options_.max_open_files = 20; // only 10 files in file cache + db_options_.disableDataSync = true; + db_options_.max_background_compactions = 3; + + default_cf.compaction_style = kCompactionStyleLevel; + default_cf.num_levels = 3; + default_cf.write_buffer_size = 64 << 10; // 64KB + default_cf.target_file_size_base = 30 << 10; + default_cf.source_compaction_factor = 100; + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + one.compaction_style = kCompactionStyleUniversal; + + one.num_levels = 1; + // trigger compaction if there are >= 4 files + one.level0_file_num_compaction_trigger = 4; + one.write_buffer_size = 120000; + + two.compaction_style = kCompactionStyleLevel; + two.num_levels = 4; + two.level0_file_num_compaction_trigger = 3; + two.write_buffer_size = 100000; + + Reopen({default_cf, one, two}); + + // SETUP column family "one" -- universal style + for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(i + 1), 1); + } + bool cf_1_1 = true; + bool cf_1_2 = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:1"}, + {"ColumnFamilyTest::ManualAuto:5", "ColumnFamilyTest::ManualAuto:2"}, + {"ColumnFamilyTest::ManualAuto:2", "ColumnFamilyTest::ManualAuto:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (cf_1_1) { + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4"); + cf_1_1 = false; + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3"); + } else if (cf_1_2) { + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2"); + cf_1_2 = false; + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + std::thread threads([&] { + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK( + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + }); + + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1"); + + // SETUP column family "two" -- level style with 4 levels + for (int i = 0; i < two.level0_file_num_compaction_trigger; ++i) { + PutRandomData(2, 10, 12000); + PutRandomData(2, 1, 10); + WaitForFlush(2); + AssertFilesPerLevel(ToString(i + 1), 2); + } + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5"); + threads.join(); + + // WAIT for compactions + WaitForCompaction(); + + // VERIFY compaction "one" + AssertFilesPerLevel("1", 1); + + // VERIFY compaction "two" + AssertFilesPerLevel("0,1", 2); + CompactAll(2); + AssertFilesPerLevel("0,1", 2); + // Compare against saved keys + std::set::iterator key_iter = keys_.begin(); + while (key_iter != keys_.end()) { + ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); + key_iter++; + } + Close(); +} + +TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactions) { + Open(); + CreateColumnFamilies({"one"}); + ColumnFamilyOptions default_cf, one; + db_options_.max_open_files = 20; // only 10 files in file cache + db_options_.disableDataSync = true; + db_options_.max_background_compactions = 3; + + default_cf.compaction_style = kCompactionStyleLevel; + default_cf.num_levels = 3; + default_cf.write_buffer_size = 64 << 10; // 64KB + default_cf.target_file_size_base = 30 << 10; + default_cf.source_compaction_factor = 100; + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + one.compaction_style = kCompactionStyleUniversal; + + one.num_levels = 1; + // trigger compaction if there are >= 4 files + one.level0_file_num_compaction_trigger = 4; + one.write_buffer_size = 120000; + + Reopen({default_cf, one}); + + // SETUP column family "one" -- universal style + for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(i + 1), 1); + } + bool cf_1_1 = true; + bool cf_1_2 = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"}, + {"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"}, + {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:2"}, + {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (cf_1_1) { + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4"); + cf_1_1 = false; + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3"); + } else if (cf_1_2) { + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2"); + cf_1_2 = false; + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + std::thread threads([&] { + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK( + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + }); + + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5"); + + WaitForFlush(1); + + // Add more L0 files and force automatic compaction + for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i), + 1); + } + + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1"); + + threads.join(); + WaitForCompaction(); + // VERIFY compaction "one" + ASSERT_LE(NumTableFilesAtLevel(0, 1), 2); + + // Compare against saved keys + std::set::iterator key_iter = keys_.begin(); + while (key_iter != keys_.end()) { + ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); + key_iter++; + } + Close(); +} + +TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactionsLevel) { + Open(); + CreateColumnFamilies({"one"}); + ColumnFamilyOptions default_cf, one; + db_options_.max_open_files = 20; // only 10 files in file cache + db_options_.disableDataSync = true; + db_options_.max_background_compactions = 3; + + default_cf.compaction_style = kCompactionStyleLevel; + default_cf.num_levels = 3; + default_cf.write_buffer_size = 64 << 10; // 64KB + default_cf.target_file_size_base = 30 << 10; + default_cf.source_compaction_factor = 100; + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + one.compaction_style = kCompactionStyleLevel; + + one.num_levels = 1; + // trigger compaction if there are >= 4 files + one.level0_file_num_compaction_trigger = 4; + one.write_buffer_size = 120000; + + Reopen({default_cf, one}); + + // SETUP column family "one" -- level style + for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(i + 1), 1); + } + bool cf_1_1 = true; + bool cf_1_2 = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"}, + {"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"}, + {"ColumnFamilyTest::ManualAuto:3", "ColumnFamilyTest::ManualAuto:2"}, + {"LevelCompactionPicker::PickCompactionBySize:0", + "ColumnFamilyTest::ManualAuto:3"}, + {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (cf_1_1) { + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4"); + cf_1_1 = false; + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3"); + } else if (cf_1_2) { + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2"); + cf_1_2 = false; + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + std::thread threads([&] { + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK( + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + }); + + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5"); + + // Add more L0 files and force automatic compaction + for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i), + 1); + } + + TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1"); + + threads.join(); + WaitForCompaction(); + // VERIFY compaction "one" + AssertFilesPerLevel("0,1", 1); + + // Compare against saved keys + std::set::iterator key_iter = keys_.begin(); + while (key_iter != keys_.end()) { + ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); + key_iter++; + } + Close(); +} + +// This test checks for automatic getting a conflict if there is a +// manual which has not yet been scheduled. +// The manual compaction waits in NotScheduled +// We generate more files and then trigger an automatic compaction +// This will wait because there is an unscheduled manual compaction. +// Once the conflict is hit, the manual compaction starts and ends +// Then another automatic will start and end. +TEST_F(ColumnFamilyTest, SameCFManualAutomaticConflict) { + Open(); + CreateColumnFamilies({"one"}); + ColumnFamilyOptions default_cf, one; + db_options_.max_open_files = 20; // only 10 files in file cache + db_options_.disableDataSync = true; + db_options_.max_background_compactions = 3; + + default_cf.compaction_style = kCompactionStyleLevel; + default_cf.num_levels = 3; + default_cf.write_buffer_size = 64 << 10; // 64KB + default_cf.target_file_size_base = 30 << 10; + default_cf.source_compaction_factor = 100; + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + one.compaction_style = kCompactionStyleUniversal; + + one.num_levels = 1; + // trigger compaction if there are >= 4 files + one.level0_file_num_compaction_trigger = 4; + one.write_buffer_size = 120000; + + Reopen({default_cf, one}); + + // SETUP column family "one" -- universal style + for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(i + 1), 1); + } + bool cf_1_1 = true; + bool cf_1_2 = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BackgroundCompaction()::Conflict", + "ColumnFamilyTest::ManualAutoCon:7"}, + {"ColumnFamilyTest::ManualAutoCon:9", + "ColumnFamilyTest::ManualAutoCon:8"}, + {"ColumnFamilyTest::ManualAutoCon:2", + "ColumnFamilyTest::ManualAutoCon:6"}, + {"ColumnFamilyTest::ManualAutoCon:4", + "ColumnFamilyTest::ManualAutoCon:5"}, + {"ColumnFamilyTest::ManualAutoCon:1", + "ColumnFamilyTest::ManualAutoCon:2"}, + {"ColumnFamilyTest::ManualAutoCon:1", + "ColumnFamilyTest::ManualAutoCon:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (cf_1_1) { + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:4"); + cf_1_1 = false; + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:3"); + } else if (cf_1_2) { + cf_1_2 = false; + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:2"); + } + }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::RunManualCompaction:NotScheduled", [&](void* arg) { + InstrumentedMutex* mutex = static_cast(arg); + mutex->Unlock(); + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:9"); + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:7"); + mutex->Lock(); + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + std::thread threads([&] { + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK( + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:6"); + }); + + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:8"); + WaitForFlush(1); + + // Add more L0 files and force automatic compaction + for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i), + 1); + } + + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:5"); + // Add more L0 files and force automatic compaction + for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + } + TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:1"); + + threads.join(); + WaitForCompaction(); + // VERIFY compaction "one" + ASSERT_LE(NumTableFilesAtLevel(0, 1), 3); + + // Compare against saved keys + std::set::iterator key_iter = keys_.begin(); + while (key_iter != keys_.end()) { + ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); + key_iter++; + } + + Close(); +} + +// In this test, we generate enough files to trigger automatic compactions. +// The automatic compaction waits in NonTrivial:AfterRun +// We generate more files and then trigger an automatic compaction +// This will wait because the automatic compaction has files it needs. +// Once the conflict is hit, the automatic compaction starts and ends +// Then the manual will run and end. +TEST_F(ColumnFamilyTest, SameCFAutomaticManualCompactions) { + Open(); + CreateColumnFamilies({"one"}); + ColumnFamilyOptions default_cf, one; + db_options_.max_open_files = 20; // only 10 files in file cache + db_options_.disableDataSync = true; + db_options_.max_background_compactions = 3; + + default_cf.compaction_style = kCompactionStyleLevel; + default_cf.num_levels = 3; + default_cf.write_buffer_size = 64 << 10; // 64KB + default_cf.target_file_size_base = 30 << 10; + default_cf.source_compaction_factor = 100; + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + one.compaction_style = kCompactionStyleUniversal; + + one.num_levels = 1; + // trigger compaction if there are >= 4 files + one.level0_file_num_compaction_trigger = 4; + one.write_buffer_size = 120000; + + Reopen({default_cf, one}); + + bool cf_1_1 = true; + bool cf_1_2 = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:2"}, + {"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:5"}, + {"CompactionPicker::CompactRange:Conflict", + "ColumnFamilyTest::AutoManual:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (cf_1_1) { + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4"); + cf_1_1 = false; + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3"); + } else if (cf_1_2) { + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2"); + cf_1_2 = false; + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // SETUP column family "one" -- universal style + for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + AssertFilesPerLevel(ToString(i + 1), 1); + } + + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5"); + + // Add another L0 file and force automatic compaction + for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { + PutRandomData(1, 10, 12000, true); + PutRandomData(1, 1, 10, true); + WaitForFlush(1); + } + + CompactRangeOptions compact_options; + compact_options.exclusive_manual_compaction = false; + ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + + TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1"); + + WaitForCompaction(); + // VERIFY compaction "one" + AssertFilesPerLevel("1", 1); + // Compare against saved keys + std::set::iterator key_iter = keys_.begin(); + while (key_iter != keys_.end()) { + ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); + key_iter++; + } + + Close(); +} +#endif // !ROCKSDB_LITE + #ifndef ROCKSDB_LITE // Tailing interator not supported namespace { std::string IterStatus(Iterator* iter) { diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index d27bd3cb5..00fb9096e 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -455,7 +455,7 @@ Compaction* CompactionPicker::CompactRange( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, int input_level, int output_level, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end) { + InternalKey** compaction_end, bool* manual_conflict) { // CompactionPickerFIFO has its own implementation of compact range assert(ioptions_.compaction_style != kCompactionStyleFIFO); @@ -481,6 +481,12 @@ Compaction* CompactionPicker::CompactRange( return nullptr; } + if ((start_level == 0) && (!level0_compactions_in_progress_.empty())) { + *manual_conflict = true; + // Only one level 0 compaction allowed + return nullptr; + } + std::vector inputs(vstorage->num_levels() - start_level); for (int level = start_level; level < vstorage->num_levels(); level++) { @@ -489,13 +495,21 @@ Compaction* CompactionPicker::CompactRange( for (FileMetaData* f : vstorage->LevelFiles(level)) { files.push_back(f); } + if (FilesInCompaction(files)) { + *manual_conflict = true; + return nullptr; + } } - return new Compaction( + Compaction* c = new Compaction( vstorage, mutable_cf_options, std::move(inputs), output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), /* max_grandparent_overlap_bytes */ LLONG_MAX, output_path_id, GetCompressionType(ioptions_, output_level, 1), /* grandparents */ {}, /* is manual */ true); + if (start_level == 0) { + level0_compactions_in_progress_.insert(c); + } + return c; } CompactionInputFiles inputs; @@ -514,6 +528,13 @@ Compaction* CompactionPicker::CompactRange( return nullptr; } + if ((input_level == 0) && (!level0_compactions_in_progress_.empty())) { + // Only one level 0 compaction allowed + TEST_SYNC_POINT("CompactionPicker::CompactRange:Conflict"); + *manual_conflict = true; + return nullptr; + } + // Avoid compacting too much in one shot in case the range is large. // But we cannot do this for level-0 since level-0 files can overlap // and we must not pick one file and drop another older file if the @@ -536,9 +557,10 @@ Compaction* CompactionPicker::CompactRange( assert(output_path_id < static_cast(ioptions_.db_paths.size())); if (ExpandWhileOverlapping(cf_name, vstorage, &inputs) == false) { - // manual compaction is currently single-threaded, so it should never + // manual compaction is now multi-threaded, so it can // happen that ExpandWhileOverlapping fails - assert(false); + // we handle it higher in RunManualCompaction + *manual_conflict = true; return nullptr; } @@ -557,9 +579,10 @@ Compaction* CompactionPicker::CompactRange( int parent_index = -1; if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs, &output_level_inputs, &parent_index, -1)) { - // manual compaction is currently single-threaded, so it should never + // manual compaction is now multi-threaded, so it can // happen that SetupOtherInputs fails - assert(false); + // we handle it higher in RunManualCompaction + *manual_conflict = true; return nullptr; } } @@ -568,6 +591,12 @@ Compaction* CompactionPicker::CompactRange( if (!output_level_inputs.empty()) { compaction_inputs.push_back(output_level_inputs); } + for (size_t i = 0; i < compaction_inputs.size(); i++) { + if (FilesInCompaction(compaction_inputs[i].files)) { + *manual_conflict = true; + return nullptr; + } + } std::vector grandparents; GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents); @@ -580,6 +609,9 @@ Compaction* CompactionPicker::CompactRange( std::move(grandparents), /* is manual compaction */ true); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); + if (input_level == 0) { + level0_compactions_in_progress_.insert(compaction); + } return compaction; } @@ -1033,6 +1065,7 @@ bool LevelCompactionPicker::PickCompactionBySize(VersionStorageInfo* vstorage, // could be made better by looking at key-ranges that are // being compacted at level 0. if (level == 0 && !level0_compactions_in_progress_.empty()) { + TEST_SYNC_POINT("LevelCompactionPicker::PickCompactionBySize:0"); return false; } @@ -1751,7 +1784,7 @@ Compaction* FIFOCompactionPicker::CompactRange( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, int input_level, int output_level, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end) { + InternalKey** compaction_end, bool* manual_conflict) { assert(input_level == 0); assert(output_level == 0); *compaction_end = nullptr; diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 062fa06da..8798235a1 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -60,7 +60,7 @@ class CompactionPicker { const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, int input_level, int output_level, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end); + InternalKey** compaction_end, bool* manual_conflict); // The maximum allowed output level. Default value is NumberLevels() - 1. virtual int MaxOutputLevel() const { @@ -302,7 +302,7 @@ class FIFOCompactionPicker : public CompactionPicker { const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, int input_level, int output_level, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end) override; + InternalKey** compaction_end, bool* manual_conflict) override; // The maximum allowed output level. Always returns 0. virtual int MaxOutputLevel() const override { @@ -329,11 +329,13 @@ class NullCompactionPicker : public CompactionPicker { } // Always return "nullptr" - Compaction* CompactRange( - const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, int input_level, int output_level, - uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end) override { + Compaction* CompactRange(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, int input_level, + int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end, + bool* manual_conflict) override { return nullptr; } diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 72b2b707d..80f2b3900 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -21,11 +21,13 @@ class DBCompactionTest : public DBTestBase { DBCompactionTest() : DBTestBase("/db_compaction_test") {} }; -class DBCompactionTestWithParam : public DBTestBase, - public testing::WithParamInterface { +class DBCompactionTestWithParam + : public DBTestBase, + public testing::WithParamInterface> { public: DBCompactionTestWithParam() : DBTestBase("/db_compaction_test") { - max_subcompactions_ = GetParam(); + max_subcompactions_ = std::get<0>(GetParam()); + exclusive_manual_compaction_ = std::get<1>(GetParam()); } // Required if inheriting from testing::WithParamInterface<> @@ -33,6 +35,7 @@ class DBCompactionTestWithParam : public DBTestBase, static void TearDownTestCase() {} uint32_t max_subcompactions_; + bool exclusive_manual_compaction_; }; namespace { @@ -617,8 +620,11 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) { ASSERT_EQ(metadata.size(), 1U); LiveFileMetaData level0_file = metadata[0]; // L0 file meta + CompactRangeOptions cro; + cro.exclusive_manual_compaction = exclusive_manual_compaction_; + // Compaction will initiate a trivial move from L0 to L1 - dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); + dbfull()->CompactRange(cro, nullptr, nullptr); // File moved From L0 to L1 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0 @@ -682,9 +688,12 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) { ASSERT_EQ(level0_files, ranges.size()); // Multiple files in L0 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1 + CompactRangeOptions cro; + cro.exclusive_manual_compaction = exclusive_manual_compaction_; + // Since data is non-overlapping we expect compaction to initiate // a trivial move - db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + db_->CompactRange(cro, nullptr, nullptr); // We expect that all the files were trivially moved from L0 to L1 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files); @@ -721,7 +730,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) { ASSERT_OK(Flush()); } - db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + db_->CompactRange(cro, nullptr, nullptr); for (uint32_t i = 0; i < ranges.size(); i++) { for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { @@ -777,6 +786,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) { CompactRangeOptions compact_options; compact_options.change_level = true; compact_options.target_level = 6; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); // 2 files in L6 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0)); @@ -792,6 +802,263 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) { } } +TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) { + int32_t trivial_move = 0; + int32_t non_trivial_move = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:TrivialMove", + [&](void* arg) { trivial_move++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial", + [&](void* arg) { non_trivial_move++; }); + bool first = true; + bool second = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBCompaction::ManualPartial:4", "DBCompaction::ManualPartial:1"}, + {"DBCompaction::ManualPartial:2", "DBCompaction::ManualPartial:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (first) { + TEST_SYNC_POINT("DBCompaction::ManualPartial:4"); + first = false; + TEST_SYNC_POINT("DBCompaction::ManualPartial:3"); + } else if (second) { + TEST_SYNC_POINT("DBCompaction::ManualPartial:2"); + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Options options = CurrentOptions(); + options.write_buffer_size = 10 * 1024 * 1024; + options.num_levels = 7; + options.max_subcompactions = max_subcompactions_; + options.level0_file_num_compaction_trigger = 3; + options.max_background_compactions = 3; + options.target_file_size_base = 1 << 23; // 8 MB + + DestroyAndReopen(options); + int32_t value_size = 10 * 1024; // 10 KB + + // Add 2 non-overlapping files + Random rnd(301); + std::map values; + + // file 1 [0 => 100] + for (int32_t i = 0; i < 100; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // file 2 [100 => 300] + for (int32_t i = 100; i < 300; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // 2 files in L0 + ASSERT_EQ("2", FilesPerLevel(0)); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 6; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; + // Trivial move the two non-overlapping files to level 6 + ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + // 2 files in L6 + ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0)); + + ASSERT_EQ(trivial_move, 1); + ASSERT_EQ(non_trivial_move, 0); + + // file 3 [ 0 => 200] + for (int32_t i = 0; i < 200; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // 1 files in L0 + ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel(0)); + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false)); + ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr, false)); + ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr, nullptr, false)); + ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr, false)); + ASSERT_OK(dbfull()->TEST_CompactRange(4, nullptr, nullptr, nullptr, false)); + // 2 files in L6, 1 file in L5 + ASSERT_EQ("0,0,0,0,0,1,2", FilesPerLevel(0)); + + ASSERT_EQ(trivial_move, 6); + ASSERT_EQ(non_trivial_move, 0); + + std::thread threads([&] { + compact_options.change_level = false; + compact_options.exclusive_manual_compaction = false; + std::string begin_string = Key(0); + std::string end_string = Key(199); + Slice begin(begin_string); + Slice end(end_string); + ASSERT_OK(db_->CompactRange(compact_options, &begin, &end)); + }); + + TEST_SYNC_POINT("DBCompaction::ManualPartial:1"); + // file 4 [300 => 400) + for (int32_t i = 300; i <= 400; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // file 5 [400 => 500) + for (int32_t i = 400; i <= 500; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // file 6 [500 => 600) + for (int32_t i = 500; i <= 600; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // 3 files in L0 + ASSERT_EQ("3,0,0,0,0,1,2", FilesPerLevel(0)); + // 1 file in L6, 1 file in L1 + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel(0)); + threads.join(); + + for (int32_t i = 0; i < 600; i++) { + ASSERT_EQ(Get(Key(i)), values[i]); + } +} + +TEST_F(DBCompactionTest, ManualPartialFill) { + int32_t trivial_move = 0; + int32_t non_trivial_move = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:TrivialMove", + [&](void* arg) { trivial_move++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial", + [&](void* arg) { non_trivial_move++; }); + bool first = true; + bool second = true; + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBCompaction::PartialFill:4", "DBCompaction::PartialFill:1"}, + {"DBCompaction::PartialFill:2", "DBCompaction::PartialFill:3"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { + if (first) { + TEST_SYNC_POINT("DBCompaction::PartialFill:4"); + first = false; + TEST_SYNC_POINT("DBCompaction::PartialFill:3"); + } else if (second) { + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Options options = CurrentOptions(); + options.write_buffer_size = 10 * 1024 * 1024; + options.max_bytes_for_level_multiplier = 2; + options.num_levels = 4; + options.level0_file_num_compaction_trigger = 3; + options.max_background_compactions = 3; + + DestroyAndReopen(options); + int32_t value_size = 10 * 1024; // 10 KB + + // Add 2 non-overlapping files + Random rnd(301); + std::map values; + + // file 1 [0 => 100] + for (int32_t i = 0; i < 100; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // file 2 [100 => 300] + for (int32_t i = 100; i < 300; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // 2 files in L0 + ASSERT_EQ("2", FilesPerLevel(0)); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 2; + ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + // 2 files in L2 + ASSERT_EQ("0,0,2", FilesPerLevel(0)); + + ASSERT_EQ(trivial_move, 1); + ASSERT_EQ(non_trivial_move, 0); + + // file 3 [ 0 => 200] + for (int32_t i = 0; i < 200; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // 2 files in L2, 1 in L0 + ASSERT_EQ("1,0,2", FilesPerLevel(0)); + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false)); + // 2 files in L2, 1 in L1 + ASSERT_EQ("0,1,2", FilesPerLevel(0)); + + ASSERT_EQ(trivial_move, 2); + ASSERT_EQ(non_trivial_move, 0); + + std::thread threads([&] { + compact_options.change_level = false; + compact_options.exclusive_manual_compaction = false; + std::string begin_string = Key(0); + std::string end_string = Key(199); + Slice begin(begin_string); + Slice end(end_string); + ASSERT_OK(db_->CompactRange(compact_options, &begin, &end)); + }); + + TEST_SYNC_POINT("DBCompaction::PartialFill:1"); + // Many files 4 [300 => 4300) + for (int32_t i = 0; i <= 5; i++) { + for (int32_t j = 300; j < 4300; j++) { + if (j == 2300) { + ASSERT_OK(Flush()); + dbfull()->TEST_WaitForFlushMemTable(); + } + values[j] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(j), values[j])); + } + } + + // Verify level sizes + uint64_t target_size = 4 * options.max_bytes_for_level_base; + for (int32_t i = 1; i < options.num_levels; i++) { + ASSERT_LE(SizeAtLevel(i), target_size); + target_size *= options.max_bytes_for_level_multiplier; + } + + TEST_SYNC_POINT("DBCompaction::PartialFill:2"); + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + threads.join(); + + for (int32_t i = 0; i < 4300; i++) { + ASSERT_EQ(Get(Key(i)), values[i]); + } +} + TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) { int32_t trivial_move = 0; int32_t non_trivial_move = 0; @@ -825,6 +1092,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) { CompactRangeOptions compact_options; compact_options.change_level = true; compact_options.target_level = 3; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); ASSERT_EQ("0,0,0,1", FilesPerLevel(0)); ASSERT_EQ(trivial_move, 1); @@ -838,8 +1106,10 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) { ASSERT_OK(Flush()); ASSERT_EQ("1,0,0,1", FilesPerLevel(0)); + CompactRangeOptions cro; + cro.exclusive_manual_compaction = exclusive_manual_compaction_; // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves) - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); ASSERT_EQ("0,0,0,2", FilesPerLevel(0)); ASSERT_EQ(trivial_move, 4); ASSERT_EQ(non_trivial_move, 0); @@ -1143,6 +1413,7 @@ TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) { compact_options.target_level = 0; compact_options.bottommost_level_compaction = BottommostLevelCompaction::kForce; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr); // Only 1 file in L0 @@ -1276,7 +1547,9 @@ TEST_P(DBCompactionTestWithParam, ManualCompaction) { uint64_t prev_block_cache_add = options.statistics->getTickerCount(BLOCK_CACHE_ADD); - db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); + CompactRangeOptions cro; + cro.exclusive_manual_compaction = exclusive_manual_compaction_; + db_->CompactRange(cro, handles_[1], nullptr, nullptr); // Verify manual compaction doesn't fill block cache ASSERT_EQ(prev_block_cache_add, options.statistics->getTickerCount(BLOCK_CACHE_ADD)); @@ -1355,6 +1628,7 @@ TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) { ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); CompactRangeOptions compact_options; compact_options.target_path_id = 1; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); ASSERT_EQ("0,1", FilesPerLevel(1)); @@ -1867,7 +2141,10 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) { } INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam, - ::testing::Values(1, 4)); + ::testing::Values(std::make_tuple(1, true), + std::make_tuple(1, false), + std::make_tuple(4, true), + std::make_tuple(4, false))); class CompactionPriTest : public DBTestBase, public testing::WithParamInterface { diff --git a/db/db_impl.cc b/db/db_impl.cc index 144210412..f6451f39d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -253,10 +253,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) unscheduled_compactions_(0), bg_compaction_scheduled_(0), num_running_compactions_(0), - bg_manual_only_(0), bg_flush_scheduled_(0), num_running_flushes_(0), - manual_compaction_(nullptr), disable_delete_obsolete_files_(0), delete_obsolete_files_next_run_( options.env->NowMicros() + @@ -1561,6 +1559,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); + bool exclusive = options.exclusive_manual_compaction; Status s = FlushMemTable(cfd, FlushOptions()); if (!s.ok()) { @@ -1586,7 +1585,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, // Always compact all files together. s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, cfd->NumberLevels() - 1, options.target_path_id, - begin, end); + begin, end, exclusive); final_output_level = cfd->NumberLevels() - 1; } else { for (int level = 0; level <= max_level_with_files; level++) { @@ -1621,7 +1620,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, } } s = RunManualCompaction(cfd, level, output_level, options.target_path_id, - begin, end); + begin, end, exclusive); if (!s.ok()) { break; } @@ -2205,12 +2204,15 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const { Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, int output_level, uint32_t output_path_id, const Slice* begin, const Slice* end, - bool disallow_trivial_move) { + bool exclusive, bool disallow_trivial_move) { assert(input_level == ColumnFamilyData::kCompactAllLevels || input_level >= 0); InternalKey begin_storage, end_storage; + CompactionArg* ca; + bool scheduled = false; + bool manual_conflict = false; ManualCompaction manual; manual.cfd = cfd; manual.input_level = input_level; @@ -2218,6 +2220,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, manual.output_path_id = output_path_id; manual.done = false; manual.in_progress = false; + manual.incomplete = false; + manual.exclusive = exclusive; manual.disallow_trivial_move = disallow_trivial_move; // For universal compaction, we enforce every manual compaction to compact // all files. @@ -2245,7 +2249,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, // jobs drops to zero. This is needed to ensure that this manual compaction // can compact any range of keys/files. // - // bg_manual_only_ is non-zero when at least one thread is inside + // HasPendingManualCompaction() is true when at least one thread is inside // RunManualCompaction(), i.e. during that time no other compaction will // get scheduled (see MaybeScheduleFlushOrCompaction). // @@ -2254,13 +2258,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, // However, only one of them will actually schedule compaction, while // others will wait on a condition variable until it completes. - ++bg_manual_only_; - while (bg_compaction_scheduled_ > 0) { - Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "[%s] Manual compaction waiting for all other scheduled background " - "compactions to finish", - cfd->GetName().c_str()); - bg_cv_.Wait(); + AddManualCompaction(&manual); + TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_); + if (exclusive) { + while (bg_compaction_scheduled_ > 0) { + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[%s] Manual compaction waiting for all other scheduled background " + "compactions to finish", + cfd->GetName().c_str()); + bg_cv_.Wait(); + } } Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, @@ -2271,20 +2278,47 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, // the compaction will set manual.status to bg_error_ and set manual.done to // true. while (!manual.done) { - assert(bg_manual_only_ > 0); - if (manual_compaction_ != nullptr) { + assert(HasPendingManualCompaction()); + manual_conflict = false; + if (ShouldRunManualCompaction(&manual) || (manual.in_progress == true) || + scheduled || + ((manual.manual_end = &manual.tmp_storage1)&&( + (manual.compaction = manual.cfd->CompactRange( + *manual.cfd->GetLatestMutableCFOptions(), manual.input_level, + manual.output_level, manual.output_path_id, manual.begin, + manual.end, &manual.manual_end, &manual_conflict)) == + nullptr) && + manual_conflict)) { + // exclusive manual compactions should not see a conflict during + // CompactRange + assert(!exclusive || !manual_conflict); // Running either this or some other manual compaction bg_cv_.Wait(); - } else { - manual_compaction_ = &manual; + if (scheduled && manual.incomplete == true) { + assert(!manual.in_progress); + scheduled = false; + manual.incomplete = false; + } + } else if (!scheduled) { + if (manual.compaction == nullptr) { + manual.done = true; + bg_cv_.SignalAll(); + continue; + } + ca = new CompactionArg; + ca->db = this; + ca->m = &manual; + manual.incomplete = false; bg_compaction_scheduled_++; - env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW, this); + env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, + &DBImpl::UnscheduleCallback); + scheduled = true; } } assert(!manual.in_progress); - assert(bg_manual_only_ > 0); - --bg_manual_only_; + assert(HasPendingManualCompaction()); + RemoveManualCompaction(&manual); return manual.status; } @@ -2406,7 +2440,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } } - if (bg_manual_only_) { + if (HasExclusiveManualCompaction()) { // only manual compactions are allowed to run. don't schedule automatic // compactions return; @@ -2414,9 +2448,13 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { while (bg_compaction_scheduled_ < db_options_.max_background_compactions && unscheduled_compactions_ > 0) { + CompactionArg* ca = new CompactionArg; + ca->db = this; + ca->m = nullptr; bg_compaction_scheduled_++; unscheduled_compactions_--; - env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW, this); + env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, + &DBImpl::UnscheduleCallback); } } @@ -2478,10 +2516,21 @@ void DBImpl::BGWorkFlush(void* db) { TEST_SYNC_POINT("DBImpl::BGWorkFlush:done"); } -void DBImpl::BGWorkCompaction(void* db) { +void DBImpl::BGWorkCompaction(void* arg) { + CompactionArg ca = *(reinterpret_cast(arg)); + delete reinterpret_cast(arg); IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW); TEST_SYNC_POINT("DBImpl::BGWorkCompaction"); - reinterpret_cast(db)->BackgroundCallCompaction(); + reinterpret_cast(ca.db)->BackgroundCallCompaction(ca.m); +} + +void DBImpl::UnscheduleCallback(void* arg) { + CompactionArg ca = *(reinterpret_cast(arg)); + delete reinterpret_cast(arg); + if ((ca.m != nullptr) && (ca.m->compaction != nullptr)) { + delete ca.m->compaction; + } + TEST_SYNC_POINT("DBImpl::UnscheduleCallback"); } Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, @@ -2602,8 +2651,9 @@ void DBImpl::BackgroundCallFlush() { } } -void DBImpl::BackgroundCallCompaction() { +void DBImpl::BackgroundCallCompaction(void* arg) { bool made_progress = false; + ManualCompaction* m = reinterpret_cast(arg); JobContext job_context(next_job_id_.fetch_add(1), true); TEST_SYNC_POINT("BackgroundCallCompaction:0"); MaybeDumpStats(); @@ -2616,7 +2666,8 @@ void DBImpl::BackgroundCallCompaction() { CaptureCurrentFileNumberInPendingOutputs(); assert(bg_compaction_scheduled_); - Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer); + Status s = + BackgroundCompaction(&made_progress, &job_context, &log_buffer, m); TEST_SYNC_POINT("BackgroundCallCompaction:1"); if (!s.ok() && !s.IsShutdownInProgress()) { // Wait a little bit before retrying background compaction in @@ -2668,11 +2719,12 @@ void DBImpl::BackgroundCallCompaction() { // See if there's more work to be done MaybeScheduleFlushOrCompaction(); - if (made_progress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) { + if (made_progress || bg_compaction_scheduled_ == 0 || + HasPendingManualCompaction()) { // signal if // * made_progress -- need to wakeup DelayWrite // * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl - // * bg_manual_only_ > 0 -- need to wakeup RunManualCompaction + // * HasPendingManualCompaction -- need to wakeup RunManualCompaction // If none of this is true, there is no need to signal since nobody is // waiting for it bg_cv_.SignalAll(); @@ -2686,14 +2738,17 @@ void DBImpl::BackgroundCallCompaction() { Status DBImpl::BackgroundCompaction(bool* made_progress, JobContext* job_context, - LogBuffer* log_buffer) { + LogBuffer* log_buffer, void* arg) { + ManualCompaction* manual_compaction = + reinterpret_cast(arg); *made_progress = false; mutex_.AssertHeld(); - bool is_manual = (manual_compaction_ != nullptr) && - (manual_compaction_->in_progress == false); - bool trivial_move_disallowed = is_manual && - manual_compaction_->disallow_trivial_move; + bool is_manual = (manual_compaction != nullptr); + + // (manual_compaction->in_progress == false); + bool trivial_move_disallowed = + is_manual && manual_compaction->disallow_trivial_move; CompactionJobStats compaction_job_stats; Status status = bg_error_; @@ -2703,34 +2758,30 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (!status.ok()) { if (is_manual) { - manual_compaction_->status = status; - manual_compaction_->done = true; - manual_compaction_->in_progress = false; - manual_compaction_ = nullptr; + manual_compaction->status = status; + manual_compaction->done = true; + manual_compaction->in_progress = false; + delete manual_compaction->compaction; + manual_compaction = nullptr; } return status; } if (is_manual) { // another thread cannot pick up the same work - manual_compaction_->in_progress = true; - } else if (manual_compaction_ != nullptr) { - // there should be no automatic compactions running when manual compaction - // is running - return Status::OK(); + manual_compaction->in_progress = true; } unique_ptr c; - InternalKey manual_end_storage; - InternalKey* manual_end = &manual_end_storage; + // InternalKey manual_end_storage; + // InternalKey* manual_end = &manual_end_storage; if (is_manual) { - ManualCompaction* m = manual_compaction_; + ManualCompaction* m = manual_compaction; assert(m->in_progress); - c.reset(m->cfd->CompactRange( - *m->cfd->GetLatestMutableCFOptions(), m->input_level, m->output_level, - m->output_path_id, m->begin, m->end, &manual_end)); + c.reset(std::move(m->compaction)); if (!c) { m->done = true; + m->manual_end = nullptr; LogToBuffer(log_buffer, "[%s] Manual compaction from level-%d from %s .. " "%s; nothing to do\n", @@ -2744,9 +2795,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, m->cfd->GetName().c_str(), m->input_level, c->output_level(), (m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->end ? m->end->DebugString().c_str() : "(end)"), - ((m->done || manual_end == nullptr) + ((m->done || m->manual_end == nullptr) ? "(end)" - : manual_end->DebugString().c_str())); + : m->manual_end->DebugString().c_str())); } } else if (!compaction_queue_.empty()) { // cfd is referenced here @@ -2763,6 +2814,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, return Status::OK(); } + if (HaveManualCompaction(cfd)) { + // Can't compact right now, but try again later + TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict"); + return Status::OK(); + } + // Pick up latest mutable CF Options and use it throughout the // compaction job // Compaction makes a copy of the latest MutableCFOptions. It should be used @@ -2940,7 +2997,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } if (is_manual) { - ManualCompaction* m = manual_compaction_; + ManualCompaction* m = manual_compaction; if (!status.ok()) { m->status = status; m->done = true; @@ -2958,7 +3015,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // Stop the compaction if manual_end points to nullptr -- this means // that we compacted the whole range. manual_end should always point // to nullptr in case of universal compaction - if (manual_end == nullptr) { + if (m->manual_end == nullptr) { m->done = true; } if (!m->done) { @@ -2969,15 +3026,102 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, kCompactionStyleUniversal || m->cfd->ioptions()->num_levels > 1); assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO); - m->tmp_storage = *manual_end; + m->tmp_storage = *m->manual_end; m->begin = &m->tmp_storage; + m->incomplete = true; } m->in_progress = false; // not being processed anymore - manual_compaction_ = nullptr; } return status; } +bool DBImpl::HasPendingManualCompaction() { + return (!manual_compaction_dequeue_.empty()); +} + +void DBImpl::AddManualCompaction(DBImpl::ManualCompaction* m) { + manual_compaction_dequeue_.push_back(m); +} + +void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) { + // Remove from queue + std::deque::iterator it = + manual_compaction_dequeue_.begin(); + while (it != manual_compaction_dequeue_.end()) { + if (m == (*it)) { + it = manual_compaction_dequeue_.erase(it); + return; + } + it++; + } + assert(false); + return; +} + +bool DBImpl::ShouldRunManualCompaction(ManualCompaction* m) { + if ((m->exclusive) && (bg_compaction_scheduled_ > 0)) { + return true; + } + std::deque::iterator it = + manual_compaction_dequeue_.begin(); + bool seen = false; + while (it != manual_compaction_dequeue_.end()) { + if (m == (*it)) { + it++; + seen = true; + continue; + } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) { + // Consider the other manual compaction *it, conflicts if: + // overlaps with m + // and (*it) is ahead in the queue and is not yet in progress + return true; + } + it++; + } + return false; +} + +bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) { + // Remove from priority queue + std::deque::iterator it = + manual_compaction_dequeue_.begin(); + while (it != manual_compaction_dequeue_.end()) { + if ((*it)->exclusive) { + return true; + } + if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) { + // Allow automatic compaction if manual compaction is + // is in progress + return true; + } + it++; + } + return false; +} + +bool DBImpl::HasExclusiveManualCompaction() { + // Remove from priority queue + std::deque::iterator it = + manual_compaction_dequeue_.begin(); + while (it != manual_compaction_dequeue_.end()) { + if ((*it)->exclusive) { + return true; + } + it++; + } + return false; +} + +bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) { + if ((m->exclusive) || (m1->exclusive)) { + return true; + } + if (m->cfd != m1->cfd) { + return false; + } + return true; +} + namespace { struct IterState { IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version) diff --git a/db/db_impl.h b/db/db_impl.h index 0f44b378a..ff6d015c8 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -275,6 +275,7 @@ class DBImpl : public DB { Status RunManualCompaction(ColumnFamilyData* cfd, int input_level, int output_level, uint32_t output_path_id, const Slice* begin, const Slice* end, + bool exclusive, bool disallow_trivial_move = false); // Return an internal iterator over the current state of the database. @@ -554,12 +555,13 @@ class DBImpl : public DB { void MaybeScheduleFlushOrCompaction(); void SchedulePendingFlush(ColumnFamilyData* cfd); void SchedulePendingCompaction(ColumnFamilyData* cfd); - static void BGWorkCompaction(void* db); + static void BGWorkCompaction(void* arg); static void BGWorkFlush(void* db); - void BackgroundCallCompaction(); + static void UnscheduleCallback(void* arg); + void BackgroundCallCompaction(void* arg); void BackgroundCallFlush(); Status BackgroundCompaction(bool* madeProgress, JobContext* job_context, - LogBuffer* log_buffer); + LogBuffer* log_buffer, void* m = 0); Status BackgroundFlush(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer); @@ -605,7 +607,7 @@ class DBImpl : public DB { std::atomic shutting_down_; // This condition variable is signaled on these conditions: // * whenever bg_compaction_scheduled_ goes down to 0 - // * if bg_manual_only_ > 0, whenever a compaction finishes, even if it hasn't + // * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't // made any progress // * whenever a compaction made any progress // * whenever bg_flush_scheduled_ value decreases (i.e. whenever a flush is @@ -763,11 +765,6 @@ class DBImpl : public DB { // stores the number of compactions are currently running int num_running_compactions_; - // If non-zero, MaybeScheduleFlushOrCompaction() will only schedule manual - // compactions (if manual_compaction_ is not null). This mechanism enables - // manual compactions to wait until all other compactions are finished. - int bg_manual_only_; - // number of background memtable flush jobs, submitted to the HIGH pool int bg_flush_scheduled_; @@ -780,15 +777,25 @@ class DBImpl : public DB { int input_level; int output_level; uint32_t output_path_id; - bool done; Status status; + bool done; bool in_progress; // compaction request being processed? + bool incomplete; // only part of requested range compacted + bool exclusive; // current behavior of only one manual + bool disallow_trivial_move; // Force actual compaction to run const InternalKey* begin; // nullptr means beginning of key range const InternalKey* end; // nullptr means end of key range + InternalKey* manual_end; // how far we are compacting InternalKey tmp_storage; // Used to keep track of compaction progress - bool disallow_trivial_move; // Force actual compaction to run + InternalKey tmp_storage1; // Used to keep track of compaction progress + Compaction* compaction; + }; + std::deque manual_compaction_dequeue_; + + struct CompactionArg { + DBImpl* db; + ManualCompaction* m; }; - ManualCompaction* manual_compaction_; // Have we encountered a background error in paranoid mode? Status bg_error_; @@ -885,6 +892,14 @@ class DBImpl : public DB { DBPropertyType property_type, bool need_out_of_mutex, bool is_locked, uint64_t* value); + + bool HasPendingManualCompaction(); + bool HasExclusiveManualCompaction(); + void AddManualCompaction(ManualCompaction* m); + void RemoveManualCompaction(ManualCompaction* m); + bool ShouldRunManualCompaction(ManualCompaction* m); + bool HaveManualCompaction(ColumnFamilyData* cfd); + bool MCOverlap(ManualCompaction* m, ManualCompaction* m1); }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index ef8a12d10..e494c4ee5 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -70,7 +70,7 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin, cfd->ioptions()->compaction_style == kCompactionStyleFIFO) ? level : level + 1; - return RunManualCompaction(cfd, level, output_level, 0, begin, end, + return RunManualCompaction(cfd, level, output_level, 0, begin, end, true, disallow_trivial_move); } diff --git a/db/db_test.cc b/db/db_test.cc index 8364f64cb..9c8c4fbef 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -102,16 +102,21 @@ class DBTest : public DBTestBase { DBTest() : DBTestBase("/db_test") {} }; -class DBTestWithParam : public DBTest, - public testing::WithParamInterface { +class DBTestWithParam + : public DBTest, + public testing::WithParamInterface> { public: - DBTestWithParam() { max_subcompactions_ = GetParam(); } + DBTestWithParam() { + max_subcompactions_ = std::get<0>(GetParam()); + exclusive_manual_compaction_ = std::get<1>(GetParam()); + } // Required if inheriting from testing::WithParamInterface<> static void SetUpTestCase() {} static void TearDownTestCase() {} uint32_t max_subcompactions_; + bool exclusive_manual_compaction_; }; #ifndef ROCKSDB_LITE @@ -6389,7 +6394,9 @@ TEST_P(DBTestWithParam, FIFOCompactionTest) { if (iter == 0) { ASSERT_OK(dbfull()->TEST_WaitForCompact()); } else { - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + CompactRangeOptions cro; + cro.exclusive_manual_compaction = exclusive_manual_compaction_; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); } // only 5 files should survive ASSERT_EQ(NumTableFilesAtLevel(0), 5); @@ -8397,7 +8404,9 @@ TEST_P(DBTestWithParam, FilterCompactionTimeTest) { Flush(); } - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + CompactRangeOptions cro; + cro.exclusive_manual_compaction = exclusive_manual_compaction_; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); ASSERT_EQ(0U, CountLiveFiles()); Reopen(options); @@ -8802,6 +8811,36 @@ TEST_F(DBTest, HugeNumberOfLevels) { ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); } +TEST_F(DBTest, AutomaticConflictsWithManualCompaction) { + Options options = CurrentOptions(); + options.write_buffer_size = 2 * 1024 * 1024; // 2MB + options.max_bytes_for_level_base = 2 * 1024 * 1024; // 2MB + options.num_levels = 12; + options.max_background_compactions = 10; + options.max_bytes_for_level_multiplier = 2; + options.level_compaction_dynamic_level_bytes = true; + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 300000; ++i) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); + } + + std::atomic callback_count(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction()::Conflict", + [&](void* arg) { callback_count.fetch_add(1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + CompactRangeOptions croptions; + croptions.exclusive_manual_compaction = false; + ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr)); + ASSERT_GE(callback_count.load(), 1); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + for (int i = 0; i < 300000; ++i) { + ASSERT_NE("NOT_FOUND", Get(Key(i))); + } +} + // Github issue #595 // Large write batch with column families TEST_F(DBTest, LargeBatchWithColumnFamilies) { @@ -10105,7 +10144,8 @@ TEST_F(DBTest, SSTsWithLdbSuffixHandling) { } INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam, - ::testing::Values(1, 4)); + ::testing::Combine(::testing::Values(1, 4), + ::testing::Bool())); TEST_F(DBTest, PauseBackgroundWorkTest) { Options options; diff --git a/db/db_universal_compaction_test.cc b/db/db_universal_compaction_test.cc index 15f871b14..9efcf4ae5 100644 --- a/db/db_universal_compaction_test.cc +++ b/db/db_universal_compaction_test.cc @@ -22,12 +22,16 @@ static std::string CompressibleString(Random* rnd, int len) { class DBTestUniversalCompactionBase : public DBTestBase, - public ::testing::WithParamInterface { + public ::testing::WithParamInterface> { public: explicit DBTestUniversalCompactionBase( const std::string& path) : DBTestBase(path) {} - virtual void SetUp() override { num_levels_ = GetParam(); } + virtual void SetUp() override { + num_levels_ = std::get<0>(GetParam()); + exclusive_manual_compaction_ = std::get<1>(GetParam()); + } int num_levels_; + bool exclusive_manual_compaction_; }; class DBTestUniversalCompaction : public DBTestUniversalCompactionBase { @@ -406,6 +410,7 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionTargetLevel) { CompactRangeOptions compact_options; compact_options.change_level = true; compact_options.target_level = 4; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; db_->CompactRange(compact_options, nullptr, nullptr); ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0)); } @@ -498,7 +503,8 @@ TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) { INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionMultiLevels, DBTestUniversalCompactionMultiLevels, - ::testing::Values(3, 20)); + ::testing::Combine(::testing::Values(3, 20), + ::testing::Bool())); class DBTestUniversalCompactionParallel : public DBTestUniversalCompactionBase { @@ -571,7 +577,8 @@ TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) { INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionParallel, DBTestUniversalCompactionParallel, - ::testing::Values(1, 10)); + ::testing::Combine(::testing::Values(1, 10), + ::testing::Bool())); TEST_P(DBTestUniversalCompaction, UniversalCompactionOptions) { Options options; @@ -1063,6 +1070,7 @@ TEST_P(DBTestUniversalCompaction, IncreaseUniversalCompactionNumLevels) { CompactRangeOptions compact_options; compact_options.change_level = true; compact_options.target_level = 0; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr); // Need to restart it once to remove higher level records in manifest. ReopenWithColumnFamilies({"default", "pikachu"}, options); @@ -1186,7 +1194,8 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) { } INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction, - ::testing::Values(1, 3, 5)); + ::testing::Combine(::testing::Values(1, 3, 5), + ::testing::Bool())); class DBTestUniversalManualCompactionOutputPathId : public DBTestUniversalCompactionBase { @@ -1218,6 +1227,7 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId, // Full compaction to DB path 0 CompactRangeOptions compact_options; compact_options.target_path_id = 1; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); @@ -1240,6 +1250,7 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId, // Full compaction to DB path 0 compact_options.target_path_id = 0; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); @@ -1247,13 +1258,15 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId, // Fail when compacting to an invalid path ID compact_options.target_path_id = 2; + compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr) .IsInvalidArgument()); } INSTANTIATE_TEST_CASE_P(DBTestUniversalManualCompactionOutputPathId, DBTestUniversalManualCompactionOutputPathId, - ::testing::Values(1, 8)); + ::testing::Combine(::testing::Values(1, 8), + ::testing::Bool())); } // namespace rocksdb diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index e1e943093..6303cb7af 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -322,7 +322,8 @@ class HdfsEnv : public Env { } virtual void Schedule(void (*function)(void* arg), void* arg, - Priority pri = LOW, void* tag = nullptr) override {} + Priority pri = LOW, void* tag = nullptr, + void (*unschedFunction)(void* arg) = 0) override {} virtual int UnSchedule(void* tag, Priority pri) override { return 0; } diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index ebaeee6cc..74e2dca92 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -245,7 +245,8 @@ class Env { // I.e., the caller may not assume that background work items are // serialized. virtual void Schedule(void (*function)(void* arg), void* arg, - Priority pri = LOW, void* tag = nullptr) = 0; + Priority pri = LOW, void* tag = nullptr, + void (*unschedFunction)(void* arg) = 0) = 0; // Arrange to remove jobs for given arg from the queue_ if they are not // already scheduled. Caller is expected to have exclusive lock on arg. @@ -822,8 +823,8 @@ class EnvWrapper : public Env { Status UnlockFile(FileLock* l) override { return target_->UnlockFile(l); } void Schedule(void (*f)(void* arg), void* a, Priority pri, - void* tag = nullptr) override { - return target_->Schedule(f, a, pri, tag); + void* tag = nullptr, void (*u)(void* arg) = 0) override { + return target_->Schedule(f, a, pri, tag, u); } int UnSchedule(void* tag, Priority pri) override { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 04f9d7d8f..97457925d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1422,6 +1422,9 @@ enum class BottommostLevelCompaction { // CompactRangeOptions is used by CompactRange() call. struct CompactRangeOptions { + // If true, no other compaction will run at the same time as this + // manual compaction + bool exclusive_manual_compaction = true; // If true, compacted files will be moved to the minimum level capable // of holding the data or given level (specified non-negative target_level). bool change_level = false; diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 86f0e085e..54ad388d8 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -1549,7 +1549,8 @@ class WinEnv : public Env { } virtual void Schedule(void (*function)(void*), void* arg, Priority pri = LOW, - void* tag = nullptr) override; + void* tag = nullptr, + void (*unschedFunction)(void* arg) = 0) override; virtual int UnSchedule(void* arg, Priority pri) override; @@ -1978,7 +1979,8 @@ class WinEnv : public Env { } } - void Schedule(void (*function)(void* arg1), void* arg, void* tag) { + void Schedule(void (*function)(void* arg1), void* arg, void* tag, + void (*unschedFunction)(void* arg)) { std::lock_guard lg(mu_); if (exit_all_threads_) { @@ -1992,6 +1994,7 @@ class WinEnv : public Env { queue_.back().function = function; queue_.back().arg = arg; queue_.back().tag = tag; + queue_.back().unschedFunction = unschedFunction; queue_len_.store(queue_.size(), std::memory_order_relaxed); if (!HasExcessiveThread()) { @@ -2013,6 +2016,11 @@ class WinEnv : public Env { BGQueue::iterator it = queue_.begin(); while (it != queue_.end()) { if (arg == (*it).tag) { + void (*unschedFunction)(void*) = (*it).unschedFunction; + void* arg1 = (*it).arg; + if (unschedFunction != nullptr) { + (*unschedFunction)(arg1); + } it = queue_.erase(it); count++; } else { @@ -2036,6 +2044,7 @@ class WinEnv : public Env { void* arg; void (*function)(void*); void* tag; + void (*unschedFunction)(void*); }; typedef std::deque BGQueue; @@ -2094,9 +2103,9 @@ WinEnv::WinEnv() } void WinEnv::Schedule(void (*function)(void*), void* arg, Priority pri, - void* tag) { + void* tag, void (*unschedFunction)(void* arg)) { assert(pri >= Priority::LOW && pri <= Priority::HIGH); - thread_pools_[pri].Schedule(function, arg, tag); + thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); } int WinEnv::UnSchedule(void* arg, Priority pri) { diff --git a/util/env_posix.cc b/util/env_posix.cc index 8b44a5b11..9d549b44d 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -443,7 +443,8 @@ class PosixEnv : public Env { } virtual void Schedule(void (*function)(void* arg1), void* arg, - Priority pri = LOW, void* tag = nullptr) override; + Priority pri = LOW, void* tag = nullptr, + void (*unschedFunction)(void* arg) = 0) override; virtual int UnSchedule(void* arg, Priority pri) override; @@ -689,9 +690,9 @@ PosixEnv::PosixEnv() } void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri, - void* tag) { + void* tag, void (*unschedFunction)(void* arg)) { assert(pri >= Priority::LOW && pri <= Priority::HIGH); - thread_pools_[pri].Schedule(function, arg, tag); + thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); } int PosixEnv::UnSchedule(void* arg, Priority pri) { diff --git a/util/thread_posix.cc b/util/thread_posix.cc index c8c07e2a2..88e67ed76 100644 --- a/util/thread_posix.cc +++ b/util/thread_posix.cc @@ -7,8 +7,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include #include "util/thread_posix.h" +#include #include #ifdef OS_LINUX #include @@ -197,7 +197,8 @@ void ThreadPool::StartBGThreads() { } } -void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag) { +void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag, + void (*unschedFunction)(void* arg)) { PthreadCall("lock", pthread_mutex_lock(&mu_)); if (exit_all_threads_) { @@ -212,6 +213,7 @@ void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag) { queue_.back().function = function; queue_.back().arg = arg; queue_.back().tag = tag; + queue_.back().unschedFunction = unschedFunction; queue_len_.store(static_cast(queue_.size()), std::memory_order_relaxed); @@ -235,6 +237,11 @@ int ThreadPool::UnSchedule(void* arg) { BGQueue::iterator it = queue_.begin(); while (it != queue_.end()) { if (arg == (*it).tag) { + void (*unschedFunction)(void*) = (*it).unschedFunction; + void* arg1 = (*it).arg; + if (unschedFunction != nullptr) { + (*unschedFunction)(arg1); + } it = queue_.erase(it); count++; } else { diff --git a/util/thread_posix.h b/util/thread_posix.h index 28db0d7e6..c5d643878 100644 --- a/util/thread_posix.h +++ b/util/thread_posix.h @@ -24,7 +24,8 @@ class ThreadPool { void IncBackgroundThreadsIfNeeded(int num); void SetBackgroundThreads(int num); void StartBGThreads(); - void Schedule(void (*function)(void* arg1), void* arg, void* tag); + void Schedule(void (*function)(void* arg1), void* arg, void* tag, + void (*unschedFunction)(void* arg)); int UnSchedule(void* arg); unsigned int GetQueueLen() const { @@ -66,6 +67,7 @@ class ThreadPool { void* arg; void (*function)(void*); void* tag; + void (*unschedFunction)(void*); }; typedef std::deque BGQueue; diff --git a/utilities/flashcache/flashcache.cc b/utilities/flashcache/flashcache.cc index a1a035244..886d449e8 100644 --- a/utilities/flashcache/flashcache.cc +++ b/utilities/flashcache/flashcache.cc @@ -3,10 +3,10 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -#include "rocksdb/utilities/flashcache.h" - #include "utilities/flashcache/flashcache.h" +#include "rocksdb/utilities/flashcache.h" + #ifdef OS_LINUX #include #include @@ -91,7 +91,7 @@ class FlashcacheAwareEnv : public EnvWrapper { } void Schedule(void (*f)(void* arg), void* a, Priority pri, - void* tag = nullptr) override { + void* tag = nullptr, void (*u)(void* arg) = 0) override { EnvWrapper::Schedule(&BgThreadWrapper, new Arg(f, a, cachedev_fd_), pri, tag); }