From 6fa70851215398803f0632f245a97d7a5f5ec56c Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 14 Apr 2015 21:45:20 -0700 Subject: [PATCH] CompactRange skips levels 1 to base_level -1 for dynamic level base size Summary: CompactRange() now is much more expensive for dynamic level base size as it goes through all the levels. Skip those not used levels between level 0 an base level. Test Plan: Run all unit tests Reviewers: yhchiang, rven, anthony, kradhakrishnan, igor Reviewed By: igor Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D37125 --- db/column_family.cc | 1 + db/column_family.h | 2 + db/compaction_picker.cc | 10 ++++- db/db_impl.cc | 42 +++++++++++------- db/db_test.cc | 79 +++++++++++++++++++++++++++++++++ include/rocksdb/thread_status.h | 8 ++-- 6 files changed, 119 insertions(+), 23 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 63b389eed..7df5c973f 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -536,6 +536,7 @@ Compaction* ColumnFamilyData::PickCompaction( } const int ColumnFamilyData::kCompactAllLevels = -1; +const int ColumnFamilyData::kCompactToBaseLevel = -2; Compaction* ColumnFamilyData::CompactRange( const MutableCFOptions& mutable_cf_options, diff --git a/db/column_family.h b/db/column_family.h index f9db0df6c..77af5c7aa 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -239,6 +239,8 @@ class ColumnFamilyData { // A flag to tell a manual compaction is to compact all levels together // instad of for specific level. static const int kCompactAllLevels; + // A flag to tell a manual compaction's output is base level. + static const int kCompactToBaseLevel; // REQUIRES: DB mutex held Compaction* CompactRange( const MutableCFOptions& mutable_cf_options, diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index ea5fb25f7..f8a7c9a5a 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -468,6 +468,11 @@ Compaction* CompactionPicker::CompactRange( } CompactionInputFiles output_level_inputs; + if (output_level == ColumnFamilyData::kCompactToBaseLevel) { + assert(input_level == 0); + output_level = vstorage->base_level(); + assert(output_level > 0); + } output_level_inputs.level = output_level; if (input_level != output_level) { int parent_index = -1; @@ -487,13 +492,16 @@ Compaction* CompactionPicker::CompactRange( std::vector grandparents; GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents); - return new Compaction( + Compaction* compaction = new Compaction( vstorage, mutable_cf_options, std::move(compaction_inputs), output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), mutable_cf_options.MaxGrandParentOverlapBytes(input_level), output_path_id, GetCompressionType(ioptions_, output_level, vstorage->base_level()), std::move(grandparents), /* is manual compaction */ true); + + TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); + return compaction; } #ifndef ROCKSDB_LITE diff --git a/db/db_impl.cc b/db/db_impl.cc index 95df83e10..8cf3037ee 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1312,13 +1312,9 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal && cfd->NumberLevels() > 1) { // Always compact all files together. - int output_level = 0; - if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal && - cfd->NumberLevels() > 1) { - output_level = cfd->NumberLevels() - 1; - } s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, - output_level, target_path_id, begin, end); + cfd->NumberLevels() - 1, target_path_id, begin, + end); } else { for (int level = 0; level <= max_level_with_files; level++) { // in case the compaction is unversal or if we're compacting the @@ -1330,8 +1326,13 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, (level == max_level_with_files && level > 0)) { s = RunManualCompaction(cfd, level, level, target_path_id, begin, end); } else { - // TODO(sdong) Skip empty levels if possible. - s = RunManualCompaction(cfd, level, level + 1, target_path_id, begin, + int output_level = level + 1; + if (cfd->ioptions()->compaction_style == kCompactionStyleLevel && + cfd->ioptions()->level_compaction_dynamic_level_bytes && + level == 0) { + output_level = ColumnFamilyData::kCompactToBaseLevel; + } + s = RunManualCompaction(cfd, level, output_level, target_path_id, begin, end); } if (!s.ok()) { @@ -2234,16 +2235,23 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, m->output_path_id, m->begin, m->end, &manual_end)); if (!c) { m->done = true; + LogToBuffer(log_buffer, + "[%s] Manual compaction from level-%d from %s .. " + "%s; nothing to do\n", + m->cfd->GetName().c_str(), m->input_level, + (m->begin ? m->begin->DebugString().c_str() : "(begin)"), + (m->end ? m->end->DebugString().c_str() : "(end)")); + } else { + LogToBuffer(log_buffer, + "[%s] Manual compaction from level-%d to level-%d from %s .. " + "%s; will stop at %s\n", + m->cfd->GetName().c_str(), m->input_level, c->output_level(), + (m->begin ? m->begin->DebugString().c_str() : "(begin)"), + (m->end ? m->end->DebugString().c_str() : "(end)"), + ((m->done || manual_end == nullptr) + ? "(end)" + : manual_end->DebugString().c_str())); } - LogToBuffer(log_buffer, - "[%s] Manual compaction from level-%d to level-%d from %s .. " - "%s; will stop at %s\n", - m->cfd->GetName().c_str(), m->input_level, m->output_level, - (m->begin ? m->begin->DebugString().c_str() : "(begin)"), - (m->end ? m->end->DebugString().c_str() : "(end)"), - ((m->done || manual_end == nullptr) - ? "(end)" - : manual_end->DebugString().c_str())); } else if (!compaction_queue_.empty()) { // cfd is referenced here auto cfd = PopFirstFromCompactionQueue(); diff --git a/db/db_test.cc b/db/db_test.cc index 18469365c..94918b72d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11175,6 +11175,85 @@ TEST_F(DBTest, DynamicLevelMaxBytesBase2) { ASSERT_EQ(1U, int_prop); } +// Test specific cases in dynamic max bytes +TEST_F(DBTest, DynamicLevelMaxBytesCompactRange) { + Random rnd(301); + int kMaxKey = 1000000; + + Options options = CurrentOptions(); + options.create_if_missing = true; + options.db_write_buffer_size = 2048; + options.write_buffer_size = 2048; + options.max_write_buffer_number = 2; + options.level0_file_num_compaction_trigger = 2; + options.level0_slowdown_writes_trigger = 9999; + options.level0_stop_writes_trigger = 9999; + options.target_file_size_base = 2; + options.level_compaction_dynamic_level_bytes = true; + options.max_bytes_for_level_base = 10240; + options.max_bytes_for_level_multiplier = 4; + options.max_background_compactions = 1; + const int kNumLevels = 5; + options.num_levels = kNumLevels; + options.expanded_compaction_factor = 0; // Force not expanding in compactions + BlockBasedTableOptions table_options; + table_options.block_size = 1024; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + DestroyAndReopen(options); + + // Compact against empty DB + dbfull()->CompactRange(nullptr, nullptr); + + uint64_t int_prop; + std::string str_prop; + + // Initial base level is the last level + ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop)); + ASSERT_EQ(4U, int_prop); + + // Put about 7K to L0 + for (int i = 0; i < 140; i++) { + ASSERT_OK(Put(Key(static_cast(rnd.Uniform(kMaxKey))), + RandomString(&rnd, 80))); + } + Flush(); + dbfull()->TEST_WaitForCompact(); + ASSERT_OK( + Put(Key(static_cast(rnd.Uniform(kMaxKey))), RandomString(&rnd, 80))); + Flush(); + + ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop)); + ASSERT_EQ(3U, int_prop); + ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level1", &str_prop)); + ASSERT_EQ("0", str_prop); + ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level2", &str_prop)); + ASSERT_EQ("0", str_prop); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + + std::set output_levels; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionPicker::CompactRange:Return", [&](void* arg) { + Compaction* compaction = reinterpret_cast(arg); + output_levels.insert(compaction->output_level()); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + dbfull()->CompactRange(nullptr, nullptr); + ASSERT_EQ(output_levels.size(), 2); + ASSERT_TRUE(output_levels.find(3) != output_levels.end()); + ASSERT_TRUE(output_levels.find(4) != output_levels.end()); + ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &str_prop)); + ASSERT_EQ("0", str_prop); + ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level3", &str_prop)); + ASSERT_EQ("0", str_prop); + // Base level is still level 3. + ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop)); + ASSERT_EQ(3U, int_prop); +} + TEST_F(DBTest, DynamicLevelMaxBytesBaseInc) { Options options = CurrentOptions(); options.create_if_missing = true; diff --git a/include/rocksdb/thread_status.h b/include/rocksdb/thread_status.h index 4f67441b5..67346b8e0 100644 --- a/include/rocksdb/thread_status.h +++ b/include/rocksdb/thread_status.h @@ -31,9 +31,7 @@ namespace rocksdb { // TODO(yhchiang): remove this function once c++14 is available // as std::max will be able to cover this. -constexpr int constexpr_max(int a, int b) { - return a > b ? a : b; -} +constexpr int constexpr_max(int a, int b) { return a > b ? a : b; } // A structure that describes the current status of a thread. // The status of active threads can be fetched using @@ -92,8 +90,8 @@ struct ThreadStatus { // The maximum number of properties of an operation. // This number should be set to the biggest NUM_XXX_PROPERTIES. - static const int kNumOperationProperties = constexpr_max( - NUM_COMPACTION_PROPERTIES, NUM_FLUSH_PROPERTIES); + static const int kNumOperationProperties = + constexpr_max(NUM_COMPACTION_PROPERTIES, NUM_FLUSH_PROPERTIES); // The type used to refer to a thread state. // A state describes lower-level action of a thread