From 17a1d65e3a087188bce354972a57fb1f1dd4b4c4 Mon Sep 17 00:00:00 2001 From: zczhu <> Date: Thu, 23 Jun 2022 14:25:42 -0700 Subject: [PATCH] Cut output files at compaction cursors (#10227) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: The files behind the compaction cursor contain newer data than the files ahead of it. If a compaction writes a file that spans from before its output level’s cursor to after it, then data before the cursor will be contaminated with the old timestamp from the data after the cursor. To avoid this, we can split the output file into two – one entirely before the cursor and one entirely after the cursor. Note that, in rare cases, we **DO NOT** need to cut the file if it is a trivial move since the file will not be contaminated by older files. In such case, the compact cursor is not guaranteed to be the boundary of the file, but it does not hurt the round-robin selection process. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10227 Test Plan: Add 'RoundRobinCutOutputAtCompactCursor' unit test in `db_compaction_test` Task: [T122216351](https://www.internalfb.com/intern/tasks/?t=122216351) Reviewed By: jay-zhuang Differential Revision: D37388088 Pulled By: littlepig2013 fbshipit-source-id: 9246a6a084b6037b90d6ab3183ba4dfb75a3378d --- db/compaction/compaction.cc | 22 +++++++++++++ db/compaction/compaction.h | 8 +++++ db/compaction/compaction_job.cc | 37 +++++++++++++++++++-- db/db_compaction_test.cc | 57 +++++++++++++++++++++++++++++++++ 4 files changed, 122 insertions(+), 2 deletions(-) diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 3f91da9c6..9fab3057a 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -279,6 +279,28 @@ Compaction::Compaction( } GetBoundaryKeys(vstorage, inputs_, &smallest_user_key_, &largest_user_key_); + + // Every compaction regardless of any compaction reason may respect the + // existing compact cursor in the output level to split output files + InternalKey temp_split_key = InternalKey(); + if (immutable_options_.compaction_style == kCompactionStyleLevel && + immutable_options_.compaction_pri == kRoundRobin) { + const InternalKey cursor = + input_vstorage_->GetCompactCursors()[output_level_]; + if (cursor.Valid()) { + const Slice& cursor_user_key = ExtractUserKey(cursor.Encode()); + auto ucmp = vstorage->InternalComparator()->user_comparator(); + // May split output files according to the cursor if it in the user-key + // range + if (ucmp->CompareWithoutTimestamp(cursor_user_key, smallest_user_key_) > + 0 && + ucmp->CompareWithoutTimestamp(cursor_user_key, largest_user_key_) <= + 0) { + temp_split_key = cursor; + } + } + } + output_split_key_ = temp_split_key; } Compaction::~Compaction() { diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index ad9ec470c..94168a70f 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -177,6 +177,12 @@ class Compaction { // moving a single input file to the next level (no merging or splitting) bool IsTrivialMove() const; + // The split user key in the output level if this compaction is required to + // split the output files according to the existing cursor in the output + // level under round-robin compaction policy. Empty indicates no required + // splitting key + const InternalKey GetOutputSplitKey() const { return output_split_key_; } + // If true, then the compaction can be done by simply deleting input files. bool deletion_compaction() const { return deletion_compaction_; } @@ -379,6 +385,8 @@ class Compaction { Temperature output_temperature_; // If true, then the compaction can be done by simply deleting input files. const bool deletion_compaction_; + // should it split the output file using the compact cursor? + InternalKey output_split_key_; // Compaction input files organized by level. Constant after construction const std::vector inputs_; diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 1b86de980..4c124fe50 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -191,7 +191,7 @@ struct CompactionJob::SubcompactionState { // The number of bytes overlapping between the current output and // grandparent files used in ShouldStopBefore(). uint64_t overlapped_bytes = 0; - // A flag determine whether the key has been seen in ShouldStopBefore() + // A flag determines whether the key has been seen in ShouldStopBefore() bool seen_key = false; // sub compaction job id, which is used to identify different sub-compaction // within the same compaction job. @@ -201,6 +201,9 @@ struct CompactionJob::SubcompactionState { // sub-compaction begin. bool notify_on_subcompaction_completion = false; + // A flag determines if this subcompaction has been split by the cursor + bool is_split = false; + SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size, uint32_t _sub_job_id) : compaction(c), @@ -234,6 +237,23 @@ struct CompactionJob::SubcompactionState { &compaction->column_family_data()->internal_comparator(); const std::vector& grandparents = compaction->grandparents(); + const InternalKey output_split_key = compaction->GetOutputSplitKey(); + if (output_split_key.Valid() && !is_split) { + // Invalid output_split_key indicates that we do not need to split + if ((end == nullptr || icmp->user_comparator()->Compare( + ExtractUserKey(output_split_key.Encode()), + ExtractUserKey(*end)) < 0) && + (start == nullptr || icmp->user_comparator()->Compare( + ExtractUserKey(output_split_key.Encode()), + ExtractUserKey(*start)) > 0)) { + // We may only split the output when the cursor is in the range. Split + // occurs when the next key is larger than/equal to the cursor + if (icmp->Compare(internal_key, output_split_key.Encode()) >= 0) { + is_split = true; + return true; + } + } + } bool grandparant_file_switched = false; // Scan to find earliest grandparent file that contains key. while (grandparent_index < grandparents.size() && @@ -621,6 +641,16 @@ void CompactionJob::GenSubcompactionBoundaries() { } } + Slice output_split_user_key; + const InternalKey output_split_key = c->GetOutputSplitKey(); + if (output_split_key.Valid()) { + output_split_user_key = ExtractUserKey(output_split_key.Encode()); + bounds.emplace_back(output_split_key.Encode()); + } else { + // Empty user key indicates that splitting is not required here + output_split_user_key = Slice(); + } + std::sort(bounds.begin(), bounds.end(), [cfd_comparator](const Slice& a, const Slice& b) -> bool { return cfd_comparator->Compare(ExtractUserKey(a), @@ -691,7 +721,10 @@ void CompactionJob::GenSubcompactionBoundaries() { // need to put an end boundary continue; } - if (sum >= mean) { + if (sum >= mean || + (!output_split_user_key.empty() && + cfd_comparator->Compare(ExtractUserKey(ranges[i].range.limit), + output_split_user_key) == 0)) { boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit)); sizes_.emplace_back(sum); subcompactions--; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 02c1232b8..125439b00 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5290,6 +5290,63 @@ TEST_F(DBCompactionTest, PersistRoundRobinCompactCursor) { } } +TEST_F(DBCompactionTest, RoundRobinCutOutputAtCompactCursor) { + Options options = CurrentOptions(); + options.num_levels = 3; + options.compression = kNoCompression; + options.write_buffer_size = 4 * 1024; + options.max_bytes_for_level_base = 64 * 1024; + options.max_bytes_for_level_multiplier = 4; + options.level0_file_num_compaction_trigger = 4; + options.compaction_pri = CompactionPri::kRoundRobin; + + DestroyAndReopen(options); + + VersionSet* const versions = dbfull()->GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + ASSERT_NE(cfd, nullptr); + + Version* const current = cfd->current(); + ASSERT_NE(current, nullptr); + + VersionStorageInfo* storage_info = current->storage_info(); + ASSERT_NE(storage_info, nullptr); + + const InternalKey split_cursor = InternalKey(Key(600), 100, kTypeValue); + storage_info->AddCursorForOneLevel(2, split_cursor); + + Random rnd(301); + + for (int i = 0; i < 50; i++) { + for (int j = 0; j < 50; j++) { + ASSERT_OK(Put(Key(j * 2 + i * 100), rnd.RandomString(102))); + } + } + // Add more overlapping files (avoid trivial move) to trigger compaction that + // output files in L2. Note that trivial move does not trigger compaction and + // in that case the cursor is not necessarily the boundary of file. + for (int i = 0; i < 50; i++) { + for (int j = 0; j < 50; j++) { + ASSERT_OK(Put(Key(j * 2 + 1 + i * 100), rnd.RandomString(1014))); + } + } + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + std::vector> level_to_files; + dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(), + &level_to_files); + const auto icmp = cfd->current()->storage_info()->InternalComparator(); + // Files in level 2 should be split by the cursor + for (const auto& file : level_to_files[2]) { + ASSERT_TRUE( + icmp->Compare(file.smallest.Encode(), split_cursor.Encode()) >= 0 || + icmp->Compare(file.largest.Encode(), split_cursor.Encode()) < 0); + } +} + class NoopMergeOperator : public MergeOperator { public: NoopMergeOperator() {}