diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 3f91da9c6..9fab3057a 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -279,6 +279,28 @@ Compaction::Compaction( } GetBoundaryKeys(vstorage, inputs_, &smallest_user_key_, &largest_user_key_); + + // Every compaction regardless of any compaction reason may respect the + // existing compact cursor in the output level to split output files + InternalKey temp_split_key = InternalKey(); + if (immutable_options_.compaction_style == kCompactionStyleLevel && + immutable_options_.compaction_pri == kRoundRobin) { + const InternalKey cursor = + input_vstorage_->GetCompactCursors()[output_level_]; + if (cursor.Valid()) { + const Slice& cursor_user_key = ExtractUserKey(cursor.Encode()); + auto ucmp = vstorage->InternalComparator()->user_comparator(); + // May split output files according to the cursor if it in the user-key + // range + if (ucmp->CompareWithoutTimestamp(cursor_user_key, smallest_user_key_) > + 0 && + ucmp->CompareWithoutTimestamp(cursor_user_key, largest_user_key_) <= + 0) { + temp_split_key = cursor; + } + } + } + output_split_key_ = temp_split_key; } Compaction::~Compaction() { diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index ad9ec470c..94168a70f 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -177,6 +177,12 @@ class Compaction { // moving a single input file to the next level (no merging or splitting) bool IsTrivialMove() const; + // The split user key in the output level if this compaction is required to + // split the output files according to the existing cursor in the output + // level under round-robin compaction policy. Empty indicates no required + // splitting key + const InternalKey GetOutputSplitKey() const { return output_split_key_; } + // If true, then the compaction can be done by simply deleting input files. bool deletion_compaction() const { return deletion_compaction_; } @@ -379,6 +385,8 @@ class Compaction { Temperature output_temperature_; // If true, then the compaction can be done by simply deleting input files. const bool deletion_compaction_; + // should it split the output file using the compact cursor? + InternalKey output_split_key_; // Compaction input files organized by level. Constant after construction const std::vector inputs_; diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 1b86de980..4c124fe50 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -191,7 +191,7 @@ struct CompactionJob::SubcompactionState { // The number of bytes overlapping between the current output and // grandparent files used in ShouldStopBefore(). uint64_t overlapped_bytes = 0; - // A flag determine whether the key has been seen in ShouldStopBefore() + // A flag determines whether the key has been seen in ShouldStopBefore() bool seen_key = false; // sub compaction job id, which is used to identify different sub-compaction // within the same compaction job. @@ -201,6 +201,9 @@ struct CompactionJob::SubcompactionState { // sub-compaction begin. bool notify_on_subcompaction_completion = false; + // A flag determines if this subcompaction has been split by the cursor + bool is_split = false; + SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size, uint32_t _sub_job_id) : compaction(c), @@ -234,6 +237,23 @@ struct CompactionJob::SubcompactionState { &compaction->column_family_data()->internal_comparator(); const std::vector& grandparents = compaction->grandparents(); + const InternalKey output_split_key = compaction->GetOutputSplitKey(); + if (output_split_key.Valid() && !is_split) { + // Invalid output_split_key indicates that we do not need to split + if ((end == nullptr || icmp->user_comparator()->Compare( + ExtractUserKey(output_split_key.Encode()), + ExtractUserKey(*end)) < 0) && + (start == nullptr || icmp->user_comparator()->Compare( + ExtractUserKey(output_split_key.Encode()), + ExtractUserKey(*start)) > 0)) { + // We may only split the output when the cursor is in the range. Split + // occurs when the next key is larger than/equal to the cursor + if (icmp->Compare(internal_key, output_split_key.Encode()) >= 0) { + is_split = true; + return true; + } + } + } bool grandparant_file_switched = false; // Scan to find earliest grandparent file that contains key. while (grandparent_index < grandparents.size() && @@ -621,6 +641,16 @@ void CompactionJob::GenSubcompactionBoundaries() { } } + Slice output_split_user_key; + const InternalKey output_split_key = c->GetOutputSplitKey(); + if (output_split_key.Valid()) { + output_split_user_key = ExtractUserKey(output_split_key.Encode()); + bounds.emplace_back(output_split_key.Encode()); + } else { + // Empty user key indicates that splitting is not required here + output_split_user_key = Slice(); + } + std::sort(bounds.begin(), bounds.end(), [cfd_comparator](const Slice& a, const Slice& b) -> bool { return cfd_comparator->Compare(ExtractUserKey(a), @@ -691,7 +721,10 @@ void CompactionJob::GenSubcompactionBoundaries() { // need to put an end boundary continue; } - if (sum >= mean) { + if (sum >= mean || + (!output_split_user_key.empty() && + cfd_comparator->Compare(ExtractUserKey(ranges[i].range.limit), + output_split_user_key) == 0)) { boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit)); sizes_.emplace_back(sum); subcompactions--; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 02c1232b8..125439b00 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5290,6 +5290,63 @@ TEST_F(DBCompactionTest, PersistRoundRobinCompactCursor) { } } +TEST_F(DBCompactionTest, RoundRobinCutOutputAtCompactCursor) { + Options options = CurrentOptions(); + options.num_levels = 3; + options.compression = kNoCompression; + options.write_buffer_size = 4 * 1024; + options.max_bytes_for_level_base = 64 * 1024; + options.max_bytes_for_level_multiplier = 4; + options.level0_file_num_compaction_trigger = 4; + options.compaction_pri = CompactionPri::kRoundRobin; + + DestroyAndReopen(options); + + VersionSet* const versions = dbfull()->GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + ASSERT_NE(cfd, nullptr); + + Version* const current = cfd->current(); + ASSERT_NE(current, nullptr); + + VersionStorageInfo* storage_info = current->storage_info(); + ASSERT_NE(storage_info, nullptr); + + const InternalKey split_cursor = InternalKey(Key(600), 100, kTypeValue); + storage_info->AddCursorForOneLevel(2, split_cursor); + + Random rnd(301); + + for (int i = 0; i < 50; i++) { + for (int j = 0; j < 50; j++) { + ASSERT_OK(Put(Key(j * 2 + i * 100), rnd.RandomString(102))); + } + } + // Add more overlapping files (avoid trivial move) to trigger compaction that + // output files in L2. Note that trivial move does not trigger compaction and + // in that case the cursor is not necessarily the boundary of file. + for (int i = 0; i < 50; i++) { + for (int j = 0; j < 50; j++) { + ASSERT_OK(Put(Key(j * 2 + 1 + i * 100), rnd.RandomString(1014))); + } + } + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + std::vector> level_to_files; + dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(), + &level_to_files); + const auto icmp = cfd->current()->storage_info()->InternalComparator(); + // Files in level 2 should be split by the cursor + for (const auto& file : level_to_files[2]) { + ASSERT_TRUE( + icmp->Compare(file.smallest.Encode(), split_cursor.Encode()) >= 0 || + icmp->Compare(file.largest.Encode(), split_cursor.Encode()) < 0); + } +} + class NoopMergeOperator : public MergeOperator { public: NoopMergeOperator() {}