From 40c64434d41824a7e149a8a47ea2c03838fd8059 Mon Sep 17 00:00:00 2001 From: Ari Ekmekji Date: Mon, 3 Aug 2015 11:32:14 -0700 Subject: [PATCH] Parallelize L0-L1 Compaction: Restructure Compaction Job Summary: As of now compactions involving files from Level 0 and Level 1 are single threaded because the files in L0, although sorted, are not range partitioned like the other levels. This means that during L0-L1 compaction each file from L1 needs to be merged with potentially all the files from L0. This attempt to parallelize the L0-L1 compaction assigns a thread and a corresponding iterator to each L1 file that then considers only the key range found in that L1 file and only the L0 files that have those keys (and only the specific portion of those L0 files in which those keys are found). In this way the overlap is minimized and potentially eliminated between different iterators focusing on the same files. The first step is to restructure the compaction logic to break L0-L1 compactions into multiple, smaller, sequential compactions. Eventually each of these smaller jobs will be run simultaneously. Areas to pay extra attention to are # Correct aggregation of compaction job statistics across multiple threads # Proper opening/closing of output files (make sure each thread's is unique) # Keys that span multiple L1 files # Skewed distributions of keys within L0 files Test Plan: Make and run db_test (newer version has separate compaction tests) and compaction_job_stats_test Reviewers: igor, noetzli, anthony, sdong, yhchiang Reviewed By: yhchiang Subscribers: MarkCallaghan, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D42699 --- db/compaction.h | 11 +++ db/compaction_job.cc | 135 ++++++++++++++++++++++++-------- db/compaction_job.h | 11 ++- db/compaction_job_stats_test.cc | 43 ++++++++-- db/version_set.cc | 3 + include/rocksdb/options.h | 9 +++ util/db_test_util.cc | 4 + util/db_test_util.h | 1 + util/mutable_cf_options.h | 3 + util/options.cc | 2 + 10 files changed, 180 insertions(+), 42 deletions(-) diff --git a/db/compaction.h b/db/compaction.h index 0b9d8e8b4..c2069a707 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -194,6 +194,17 @@ class Compaction { // Create a CompactionFilter from compaction_filter_factory std::unique_ptr CreateCompactionFilter() const; + // Should this compaction be broken up into smaller ones run in parallel? + bool IsSubCompaction() const { + return start_level_ == 0 && output_level_ == 1 + && mutable_cf_options_.num_subcompactions > 1; + } + + // If is_sub_compaction == true, how many smaller compactions should execute + int NumSubCompactions() const { + return mutable_cf_options_.num_subcompactions; + } + private: // mark (or clear) all files that are being compacted void MarkFilesBeingCompacted(bool mark_as_compacted); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 97a24c5ca..bc9f318ca 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -203,6 +203,34 @@ void CompactionJob::Prepare() { // Is this compaction producing files at the bottommost level? bottommost_level_ = compact_->compaction->bottommost_level(); + + GetSubCompactionBoundaries(); +} + +// For L0-L1 compaction, iterators work in parallel by processing +// different subsets of the full key range. This function returns +// the Slices that designate the boundaries of these ranges. Now +// these boundaries are defined the key ranges of the files in L1, +// and the first and last entries are always nullptr (unrestricted) +void CompactionJob::GetSubCompactionBoundaries() { + auto* c = compact_->compaction; + auto& slices = sub_compaction_boundaries_; + if (c->IsSubCompaction()) { + // TODO(aekmekji): take the option num_subcompactions into account + // when dividing up the key range between multiple iterators instead + // of just assigning each iterator one L1 file's key range + for (size_t which = 0; which < c->num_input_levels(); which++) { + if (c->level(which) == 1) { + if (c->input_levels(which)->num_files > 1) { + const LevelFilesBrief* flevel = c->input_levels(which); + for (size_t i = 1; i < flevel->num_files; i++) { + slices.emplace_back(flevel->files[i].smallest_key); + } + } + break; + } + } + } } Status CompactionJob::Run() { @@ -213,12 +241,43 @@ Status CompactionJob::Run() { auto* compaction = compact_->compaction; LogCompaction(compaction->column_family_data(), compaction); - int64_t imm_micros = 0; // Micros spent doing imm_ compactions + Status status; + Slice *start, *end; + for (size_t i = 0; i < sub_compaction_boundaries_.size() + 1; i++) { + if (i == 0) { + start = nullptr; + } else { + start = &sub_compaction_boundaries_[i - 1]; + } + if (i == sub_compaction_boundaries_.size()) { + end = nullptr; + } else { + end = &sub_compaction_boundaries_[i]; + } + + status = SubCompactionRun(start, end); + if (!status.ok()) { + break; + } + } + + UpdateCompactionStats(); + RecordCompactionIOStats(); + LogFlush(db_options_.info_log); + TEST_SYNC_POINT("CompactionJob::Run():End"); + + return status; +} + +Status CompactionJob::SubCompactionRun(Slice* start, Slice* end) { + auto* compaction = compact_->compaction; const uint64_t start_micros = env_->NowMicros(); + int64_t imm_micros = 0; // Micros spent doing imm_ compactions std::unique_ptr input(versions_->MakeInputIterator(compaction)); - input->SeekToFirst(); - auto status = ProcessKeyValueCompaction(&imm_micros, input.get()); + Status status = ProcessKeyValueCompaction(&imm_micros, input.get(), + start, end); + input.reset(); if (output_directory_ && !db_options_.disableDataSync) { @@ -227,12 +286,6 @@ Status CompactionJob::Run() { compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros; MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros); - UpdateCompactionStats(); - - RecordCompactionIOStats(); - - LogFlush(db_options_.info_log); - TEST_SYNC_POINT("CompactionJob::Run():End"); return status; } @@ -298,7 +351,8 @@ void CompactionJob::Install(Status* status, } Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, - Iterator* input) { + Iterator* input, + Slice* start, Slice* end) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_PROCESS_KV); Status status; @@ -333,10 +387,49 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, StopWatchNano timer(env_, stats_ != nullptr); uint64_t total_filter_time = 0; + if (start != nullptr) { + IterKey start_iter; + start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek); + Slice start_key = start_iter.GetKey(); + input->Seek(start_key); + } else { + input->SeekToFirst(); + } + // TODO(noetzli): check whether we could check !shutting_down_->... only // only occasionally (see diff D42687) while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) && !cfd->IsDropped() && status.ok()) { + Slice key = input->key(); + Slice value = input->value(); + + // First check that the key is parseable before performing the comparison + // to determine if it's within the range we want + if (!ParseInternalKey(key, &ikey)) { + // Do not hide error keys + // TODO: error key stays in db forever? Figure out the rationale + // v10 error v8 : we cannot hide v8 even though it's pretty obvious. + current_user_key.Clear(); + has_current_user_key = false; + last_sequence_for_key = kMaxSequenceNumber; + visible_in_snapshot = kMaxSequenceNumber; + + if (compaction_job_stats_ != nullptr) { + compaction_job_stats_->num_corrupt_keys++; + } + + status = WriteKeyValue(key, value, ikey, input->status()); + input->Next(); + continue; + } + + // If an end key is specified, check if the current key is >= than it + // and exit if it is because the iterator is out of the range desired + if (end != nullptr && + cfd->user_comparator()->Compare(ikey.user_key, *end) >= 0) { + break; + } + compact_->num_input_records++; if (++loop_cnt > 1000) { RecordDroppedKeys( @@ -345,9 +438,6 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, loop_cnt = 0; } - Slice key = input->key(); - Slice value = input->value(); - if (compaction_job_stats_ != nullptr) { compaction_job_stats_->total_input_raw_key_bytes += key.size(); compaction_job_stats_->total_input_raw_value_bytes += value.size(); @@ -361,25 +451,6 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, } } - // Handle key/value, add to state, etc. - if (!ParseInternalKey(key, &ikey)) { - // Do not hide error keys - // TODO: error key stays in db forever? Figure out the intention/rationale - // v10 error v8 : we cannot hide v8 even though it's pretty obvious. - current_user_key.Clear(); - has_current_user_key = false; - last_sequence_for_key = kMaxSequenceNumber; - visible_in_snapshot = kMaxSequenceNumber; - - if (compaction_job_stats_ != nullptr) { - compaction_job_stats_->num_corrupt_keys++; - } - - status = WriteKeyValue(key, value, ikey, input->status()); - input->Next(); - continue; - } - if (compaction_job_stats_ != nullptr && ikey.type == kTypeDeletion) { compaction_job_stats_->num_input_deletion_records++; } diff --git a/db/compaction_job.h b/db/compaction_job.h index 85589fc34..8d53af8e9 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -73,19 +73,25 @@ class CompactionJob { void Prepare(); // REQUIRED mutex not held Status Run(); + // REQUIRED: mutex held // status is the return of Run() void Install(Status* status, const MutableCFOptions& mutable_cf_options, InstrumentedMutex* db_mutex); private: + // REQUIRED: mutex not held + Status SubCompactionRun(Slice* start, Slice* end); + + void GetSubCompactionBoundaries(); // update the thread status for starting a compaction. void ReportStartedCompaction(Compaction* compaction); void AllocateCompactionOutputFileNumbers(); - // Call compaction filter. Then iterate through input and compact the // kv-pairs - Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input); + Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input, + Slice* start = nullptr, + Slice* end = nullptr); Status WriteKeyValue(const Slice& key, const Slice& value, const ParsedInternalKey& ikey, @@ -147,6 +153,7 @@ class CompactionJob { EventLogger* event_logger_; bool paranoid_file_checks_; + std::vector sub_compaction_boundaries_; }; } // namespace rocksdb diff --git a/db/compaction_job_stats_test.cc b/db/compaction_job_stats_test.cc index e61dfc2c5..1e356db34 100644 --- a/db/compaction_job_stats_test.cc +++ b/db/compaction_job_stats_test.cc @@ -84,13 +84,15 @@ std::string Key(uint64_t key, int length) { return std::string(buf); } -class CompactionJobStatsTest : public testing::Test { +class CompactionJobStatsTest : public testing::Test, + public testing::WithParamInterface { public: std::string dbname_; std::string alternative_wal_dir_; Env* env_; DB* db_; std::vector handles_; + bool subcompactions_enabled_; Options last_options_; @@ -101,6 +103,8 @@ class CompactionJobStatsTest : public testing::Test { alternative_wal_dir_ = dbname_ + "/wal"; Options options; options.create_if_missing = true; + subcompactions_enabled_ = GetParam(); + options.num_subcompactions = subcompactions_enabled_ ? 2 : 1; auto delete_options = options; delete_options.wal_dir = alternative_wal_dir_; EXPECT_OK(DestroyDB(dbname_, delete_options)); @@ -123,6 +127,9 @@ class CompactionJobStatsTest : public testing::Test { EXPECT_OK(DestroyDB(dbname_, options)); } + static void SetUpTestCase() {} + static void TearDownTestCase() {} + DBImpl* dbfull() { return reinterpret_cast(db_); } @@ -607,7 +614,7 @@ CompressionType GetAnyCompression() { } // namespace -TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) { +TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) { Random rnd(301); const int kBufSize = 100; char buf[kBufSize]; @@ -634,6 +641,7 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) { options.level0_file_num_compaction_trigger = kTestScale + 1; options.num_levels = 3; options.compression = kNoCompression; + options.num_subcompactions = subcompactions_enabled_ ? 2 : 1; for (int test = 0; test < 2; ++test) { DestroyAndReopen(options); @@ -711,6 +719,11 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) { } // 4th Phase: perform L0 -> L1 compaction again, expect higher write amp + // When subcompactions are enabled, the number of output files increases + // by 1 because multiple threads are consuming the input and generating + // output files without coordinating to see if the output could fit into + // a smaller number of files like it does when it runs sequentially + int num_output_files = options.num_subcompactions > 1 ? 2 : 1; for (uint64_t start_key = key_base; num_L0_files > 1; start_key += key_base * sparseness) { @@ -722,13 +735,18 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) { smallest_key, largest_key, 3, 2, num_keys_per_L0_file * 3, kKeySize, kValueSize, - 1, num_keys_per_L0_file * 2, // 1/3 of the data will be updated. + num_output_files, + num_keys_per_L0_file * 2, // 1/3 of the data will be updated. compression_ratio, num_keys_per_L0_file)); ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U); Compact(1, smallest_key, largest_key); - snprintf(buf, kBufSize, "%d,%d", - --num_L0_files, --num_L1_files); + // TODO(aekmekji): account for whether parallel L0-L1 compaction is + // enabled or not. If so then num_L1_files will increase by 1 + if (options.num_subcompactions == 1) { + --num_L1_files; + } + snprintf(buf, kBufSize, "%d,%d", --num_L0_files, num_L1_files); ASSERT_EQ(std::string(buf), FilesPerLevel(1)); } @@ -747,7 +765,11 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) { num_keys_per_L0_file)); ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U); Compact(1, smallest_key, largest_key); - ASSERT_EQ("0,4", FilesPerLevel(1)); + num_L1_files = options.num_subcompactions > 1 ? 7 : 4; + char L1_buf[4]; + snprintf(L1_buf, sizeof(L1_buf), "0,%d", num_L1_files); + std::string L1_files(L1_buf); + ASSERT_EQ(L1_files, FilesPerLevel(1)); options.compression = GetAnyCompression(); if (options.compression == kNoCompression) { break; @@ -758,7 +780,7 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) { ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U); } -TEST_F(CompactionJobStatsTest, DeletionStatsTest) { +TEST_P(CompactionJobStatsTest, DeletionStatsTest) { Random rnd(301); uint64_t key_base = 100000l; // Note: key_base must be multiple of num_keys_per_L0_file @@ -785,6 +807,7 @@ TEST_F(CompactionJobStatsTest, DeletionStatsTest) { options.num_levels = 3; options.compression = kNoCompression; options.max_bytes_for_level_multiplier = 2; + options.num_subcompactions = subcompactions_enabled_ ? 2 : 1; DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); @@ -854,7 +877,7 @@ int GetUniversalCompactionInputUnits(uint32_t num_flushes) { } } // namespace -TEST_F(CompactionJobStatsTest, UniversalCompactionTest) { +TEST_P(CompactionJobStatsTest, UniversalCompactionTest) { Random rnd(301); uint64_t key_base = 100000000l; // Note: key_base must be multiple of num_keys_per_L0_file @@ -876,6 +899,8 @@ TEST_F(CompactionJobStatsTest, UniversalCompactionTest) { options.compaction_style = kCompactionStyleUniversal; options.compaction_options_universal.size_ratio = 1; options.compaction_options_universal.max_size_amplification_percent = 1000; + options.num_subcompactions = subcompactions_enabled_ ? 2 : 1; + DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); @@ -923,6 +948,8 @@ TEST_F(CompactionJobStatsTest, UniversalCompactionTest) { ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U); } +INSTANTIATE_TEST_CASE_P(CompactionJobStatsTest, CompactionJobStatsTest, + ::testing::Bool()); } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index 29599170c..79c0b2e88 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3034,6 +3034,9 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { read_options.verify_checksums = c->mutable_cf_options()->verify_checksums_in_compaction; read_options.fill_cache = false; + if (c->IsSubCompaction()) { + read_options.total_order_seek = true; + } // Level-0 files have to be merged together. For other levels, // we will make a concatenating iterator per level. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b9e5c758b..0a112348e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -885,6 +885,15 @@ struct DBOptions { // Default: 1 int max_background_compactions; + // This integer represents the maximum number of threads that will + // concurrently perform a level-based compaction from L0 to L1. A value + // of 1 means there is no parallelism, and a greater number enables a + // multi-threaded version of the L0-L1 compaction that divides the compaction + // into multiple, smaller ones that are run simultaneously. This is still + // under development and is only available for level-based compaction. + // Default: 1 + int num_subcompactions; + // Maximum number of concurrent background memtable flush jobs, submitted to // the HIGH priority thread pool. // diff --git a/util/db_test_util.cc b/util/db_test_util.cc index fc6e0dc83..bca50da3b 100644 --- a/util/db_test_util.cc +++ b/util/db_test_util.cc @@ -302,6 +302,10 @@ Options DBTestBase::CurrentOptions( options.row_cache = NewLRUCache(1024 * 1024); break; } + case kLevelSubcompactions: { + options.num_subcompactions = 2; + break; + } default: break; diff --git a/util/db_test_util.h b/util/db_test_util.h index a30d80a4d..164839a5b 100644 --- a/util/db_test_util.h +++ b/util/db_test_util.h @@ -421,6 +421,7 @@ class DBTestBase : public testing::Test { kFIFOCompaction = 25, kOptimizeFiltersForHits = 26, kRowCache = 27, + kLevelSubcompactions = 28, kEnd = 28 }; int option_config_; diff --git a/util/mutable_cf_options.h b/util/mutable_cf_options.h index 7cfb0acba..9f3a7a979 100644 --- a/util/mutable_cf_options.h +++ b/util/mutable_cf_options.h @@ -40,6 +40,7 @@ struct MutableCFOptions { max_bytes_for_level_multiplier_additional( options.max_bytes_for_level_multiplier_additional), verify_checksums_in_compaction(options.verify_checksums_in_compaction), + num_subcompactions(options.num_subcompactions), max_sequential_skip_in_iterations( options.max_sequential_skip_in_iterations), paranoid_file_checks(options.paranoid_file_checks) @@ -70,6 +71,7 @@ struct MutableCFOptions { max_bytes_for_level_base(0), max_bytes_for_level_multiplier(0), verify_checksums_in_compaction(false), + num_subcompactions(1), max_sequential_skip_in_iterations(0), paranoid_file_checks(false) {} @@ -121,6 +123,7 @@ struct MutableCFOptions { int max_bytes_for_level_multiplier; std::vector max_bytes_for_level_multiplier_additional; bool verify_checksums_in_compaction; + int num_subcompactions; // Misc options uint64_t max_sequential_skip_in_iterations; diff --git a/util/options.cc b/util/options.cc index 9c57222c7..466a69bfb 100644 --- a/util/options.cc +++ b/util/options.cc @@ -214,6 +214,7 @@ DBOptions::DBOptions() wal_dir(""), delete_obsolete_files_period_micros(6 * 60 * 60 * 1000000UL), max_background_compactions(1), + num_subcompactions(1), max_background_flushes(1), max_log_file_size(0), log_file_time_to_roll(0), @@ -261,6 +262,7 @@ DBOptions::DBOptions(const Options& options) delete_obsolete_files_period_micros( options.delete_obsolete_files_period_micros), max_background_compactions(options.max_background_compactions), + num_subcompactions(options.num_subcompactions), max_background_flushes(options.max_background_flushes), max_log_file_size(options.max_log_file_size), log_file_time_to_roll(options.log_file_time_to_roll),