diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 38b015bb6..1b86de980 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -2160,6 +2160,16 @@ Status CompactionJob::InstallCompactionResults( stats.GetBytes()); } + if (compaction->compaction_reason() == CompactionReason::kLevelMaxLevelSize && + compaction->immutable_options()->compaction_pri == kRoundRobin) { + int start_level = compaction->start_level(); + if (start_level > 0) { + auto vstorage = compaction->input_version()->storage_info(); + edit->AddCompactCursor(start_level, + vstorage->GetNextCompactCursor(start_level)); + } + } + return versions_->LogAndApply(compaction->column_family_data(), mutable_cf_options, edit, db_mutex_, db_directory_); diff --git a/db/compaction/compaction_picker_level.cc b/db/compaction/compaction_picker_level.cc index 87d1e8e63..b3e4cb310 100644 --- a/db/compaction/compaction_picker_level.cc +++ b/db/compaction/compaction_picker_level.cc @@ -448,6 +448,14 @@ bool LevelCompactionBuilder::PickFileToCompact() { // do not pick a file to compact if it is being compacted // from n-1 level. if (f->being_compacted) { + if (ioptions_.compaction_pri == kRoundRobin) { + // TODO(zichen): this file may be involved in one compaction from + // an upper level, cannot advance the cursor for round-robin policy. + // Currently, we do not pick any file to compact in this case. We + // should fix this later to ensure a compaction is picked but the + // cursor shall not be advanced. + return false; + } continue; } @@ -460,6 +468,13 @@ bool LevelCompactionBuilder::PickFileToCompact() { // A locked (pending compaction) input-level file was pulled in due to // user-key overlap. start_level_inputs_.clear(); + + // To ensure every file is selcted in a round-robin manner, we cannot + // skip the current file. So we return false and wait for the next time + // we can pick this file to compact + if (ioptions_.compaction_pri == kRoundRobin) { + return false; + } continue; } @@ -479,6 +494,10 @@ bool LevelCompactionBuilder::PickFileToCompact() { !compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_, &output_level_inputs)) { start_level_inputs_.clear(); + // The same reason as above to ensure the round-robin compaction + if (ioptions_.compaction_pri == kRoundRobin) { + return false; + } continue; } base_index_ = index; @@ -486,8 +505,9 @@ bool LevelCompactionBuilder::PickFileToCompact() { } // store where to start the iteration in the next call to PickCompaction - vstorage_->SetNextCompactionIndex(start_level_, cmp_idx); - + if (ioptions_.compaction_pri != kRoundRobin) { + vstorage_->SetNextCompactionIndex(start_level_, cmp_idx); + } return start_level_inputs_.size() > 0; } diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index fca6ccd69..7da018aad 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -1311,6 +1311,44 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping4) { ASSERT_EQ(6U, compaction->input(0, 0)->fd.GetNumber()); } +TEST_F(CompactionPickerTest, CompactionPriRoundRobin) { + std::vector test_cursors = {InternalKey("249", 100, kTypeValue), + InternalKey("600", 100, kTypeValue), + InternalKey()}; + std::vector selected_files = {8U, 6U, 6U}; + + ioptions_.compaction_pri = kRoundRobin; + mutable_cf_options_.max_bytes_for_level_base = 10000000; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + for (size_t i = 0; i < test_cursors.size(); i++) { + // start a brand new version in each test. + NewVersionStorage(6, kCompactionStyleLevel); + vstorage_->ResizeCompactCursors(6); + // Set the cursor + vstorage_->AddCursorForOneLevel(2, test_cursors[i]); + Add(2, 6U, "150", "199", 50000000U); // Overlap with 26U, 27U + Add(2, 7U, "200", "249", 50000000U); // File not overlapping + Add(2, 8U, "300", "600", 50000000U); // Overlap with 28U, 29U + + Add(3, 26U, "130", "165", 60000000U); + Add(3, 27U, "166", "170", 60000000U); + Add(3, 28U, "270", "340", 60000000U); + Add(3, 29U, "401", "500", 60000000U); + UpdateVersionStorageInfo(); + LevelCompactionPicker local_level_compaction_picker = + LevelCompactionPicker(ioptions_, &icmp_); + std::unique_ptr compaction( + local_level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(1U, compaction->num_input_files(0)); + ASSERT_EQ(selected_files[i], compaction->input(0, 0)->fd.GetNumber()); + // release the version storage + DeleteVersionStorage(); + } +} + // This test exhibits the bug where we don't properly reset parent_index in // PickCompaction() TEST_F(CompactionPickerTest, ParentIndexResetBug) { diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index d307cadbb..02c1232b8 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5219,7 +5219,76 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(CompactionPri::kByCompensatedSize, CompactionPri::kOldestLargestSeqFirst, CompactionPri::kOldestSmallestSeqFirst, - CompactionPri::kMinOverlappingRatio)); + CompactionPri::kMinOverlappingRatio, + CompactionPri::kRoundRobin)); + +TEST_F(DBCompactionTest, PersistRoundRobinCompactCursor) { + Options options = CurrentOptions(); + options.write_buffer_size = 16 * 1024; + options.max_bytes_for_level_base = 64 * 1024; + options.level0_file_num_compaction_trigger = 4; + options.compaction_pri = CompactionPri::kRoundRobin; + options.max_bytes_for_level_multiplier = 4; + options.num_levels = 3; + options.compression = kNoCompression; + + DestroyAndReopen(options); + + Random rnd(301); + + // 30 Files in L0 to trigger compactions between L1 and L2 + for (int i = 0; i < 30; i++) { + for (int j = 0; j < 16; j++) { + ASSERT_OK(Put(rnd.RandomString(24), rnd.RandomString(1000))); + } + } + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + VersionSet* const versions = dbfull()->GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + ASSERT_NE(cfd, nullptr); + + Version* const current = cfd->current(); + ASSERT_NE(current, nullptr); + + const VersionStorageInfo* const storage_info = current->storage_info(); + ASSERT_NE(storage_info, nullptr); + + const std::vector compact_cursors = + storage_info->GetCompactCursors(); + + Reopen(options); + + VersionSet* const reopened_versions = dbfull()->GetVersionSet(); + assert(reopened_versions); + + ColumnFamilyData* const reopened_cfd = + reopened_versions->GetColumnFamilySet()->GetDefault(); + ASSERT_NE(reopened_cfd, nullptr); + + Version* const reopened_current = reopened_cfd->current(); + ASSERT_NE(reopened_current, nullptr); + + const VersionStorageInfo* const reopened_storage_info = + reopened_current->storage_info(); + ASSERT_NE(reopened_storage_info, nullptr); + + const std::vector reopened_compact_cursors = + reopened_storage_info->GetCompactCursors(); + const auto icmp = reopened_storage_info->InternalComparator(); + ASSERT_EQ(compact_cursors.size(), reopened_compact_cursors.size()); + for (size_t i = 0; i < compact_cursors.size(); i++) { + if (compact_cursors[i].Valid()) { + ASSERT_EQ(0, + icmp->Compare(compact_cursors[i], reopened_compact_cursors[i])); + } else { + ASSERT_TRUE(!reopened_compact_cursors[i].Valid()); + } + } +} class NoopMergeOperator : public MergeOperator { public: diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index f79b7bee8..487f8102f 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -3316,7 +3316,15 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, moved_bytes += f->fd.GetFileSize(); } } - + if (c->compaction_reason() == CompactionReason::kLevelMaxLevelSize && + c->immutable_options()->compaction_pri == kRoundRobin) { + int start_level = c->start_level(); + if (start_level > 0) { + auto vstorage = c->input_version()->storage_info(); + c->edit()->AddCompactCursor( + start_level, vstorage->GetNextCompactCursor(start_level)); + } + } status = versions_->LogAndApply(c->column_family_data(), *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); diff --git a/db/version_builder.cc b/db/version_builder.cc index d75ab1b00..f58344845 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -249,6 +249,8 @@ class VersionBuilder::Rep { bool has_invalid_levels_; // Current levels of table files affected by additions/deletions. std::unordered_map table_file_levels_; + // Current compact cursors that should be changed after the last compaction + std::unordered_map updated_compact_cursors_; NewestFirstBySeqNo level_zero_cmp_; BySmallestKey level_nonzero_cmp_; @@ -809,6 +811,22 @@ class VersionBuilder::Rep { return Status::OK(); } + Status ApplyCompactCursors(int level, + const InternalKey& smallest_uncompacted_key) { + if (level < 0) { + std::ostringstream oss; + oss << "Cannot add compact cursor (" << level << "," + << smallest_uncompacted_key.Encode().ToString() + << " due to invalid level (level = " << level << ")"; + return Status::Corruption("VersionBuilder", oss.str()); + } + if (level < num_levels_) { + // Omit levels (>= num_levels_) when re-open with shrinking num_levels_ + updated_compact_cursors_[level] = smallest_uncompacted_key; + } + return Status::OK(); + } + // Apply all of the edits in *edit to the current state. Status Apply(const VersionEdit* edit) { { @@ -860,6 +878,16 @@ class VersionBuilder::Rep { } } + // Populate compact cursors for round-robin compaction, leave + // the cursor to be empty to indicate it is invalid + for (const auto& cursor : edit->GetCompactCursors()) { + const int level = cursor.first; + const InternalKey smallest_uncompacted_key = cursor.second; + const Status s = ApplyCompactCursors(level, smallest_uncompacted_key); + if (!s.ok()) { + return s; + } + } return Status::OK(); } @@ -1142,6 +1170,13 @@ class VersionBuilder::Rep { } } + void SaveCompactCursorsTo(VersionStorageInfo* vstorage) const { + for (auto iter = updated_compact_cursors_.begin(); + iter != updated_compact_cursors_.end(); iter++) { + vstorage->AddCursorForOneLevel(iter->first, iter->second); + } + } + // Save the current state in *vstorage. Status SaveTo(VersionStorageInfo* vstorage) const { Status s; @@ -1163,6 +1198,8 @@ class VersionBuilder::Rep { SaveBlobFilesTo(vstorage); + SaveCompactCursorsTo(vstorage); + s = CheckConsistency(vstorage); return s; } diff --git a/db/version_edit.cc b/db/version_edit.cc index 8e45b353e..f3a0242c6 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -79,6 +79,7 @@ void VersionEdit::Clear() { has_max_column_family_ = false; has_min_log_number_to_keep_ = false; has_last_sequence_ = false; + compact_cursors_.clear(); deleted_files_.clear(); new_files_.clear(); blob_file_additions_.clear(); @@ -121,6 +122,13 @@ bool VersionEdit::EncodeTo(std::string* dst) const { if (has_last_sequence_) { PutVarint32Varint64(dst, kLastSequence, last_sequence_); } + for (size_t i = 0; i < compact_cursors_.size(); i++) { + if (compact_cursors_[i].second.Valid()) { + PutVarint32(dst, kCompactCursor); + PutVarint32(dst, compact_cursors_[i].first); // level + PutLengthPrefixedSlice(dst, compact_cursors_[i].second.Encode()); + } + } for (const auto& deleted : deleted_files_) { PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */, deleted.second /* file number */); @@ -512,15 +520,15 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; - case kCompactPointer: + case kCompactCursor: if (GetLevel(&input, &level, &msg) && GetInternalKey(&input, &key)) { - // we don't use compact pointers anymore, - // but we should not fail if they are still - // in manifest + // Here we re-use the output format of compact pointer in LevelDB + // to persist compact_cursors_ + compact_cursors_.push_back(std::make_pair(level, key)); } else { if (!msg) { - msg = "compaction pointer"; + msg = "compaction cursor"; } } break; diff --git a/db/version_edit.h b/db/version_edit.h index bc2d02f7d..2d373d9b4 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -36,7 +36,7 @@ enum Tag : uint32_t { kLogNumber = 2, kNextFileNumber = 3, kLastSequence = 4, - kCompactPointer = 5, + kCompactCursor = 5, kDeletedFile = 6, kNewFile = 7, // 8 was used for large value refs @@ -463,6 +463,24 @@ class VersionEdit { using NewFiles = std::vector>; const NewFiles& GetNewFiles() const { return new_files_; } + // Retrieve all the compact cursors + using CompactCursors = std::vector>; + const CompactCursors& GetCompactCursors() const { return compact_cursors_; } + void AddCompactCursor(int level, const InternalKey& cursor) { + compact_cursors_.push_back(std::make_pair(level, cursor)); + } + void SetCompactCursors( + const std::vector& compact_cursors_by_level) { + compact_cursors_.clear(); + compact_cursors_.reserve(compact_cursors_by_level.size()); + for (int i = 0; i < (int)compact_cursors_by_level.size(); i++) { + if (compact_cursors_by_level[i].Valid()) { + compact_cursors_.push_back( + std::make_pair(i, compact_cursors_by_level[i])); + } + } + } + // Add a new blob file. void AddBlobFile(uint64_t blob_file_number, uint64_t total_blob_count, uint64_t total_blob_bytes, std::string checksum_method, @@ -635,6 +653,9 @@ class VersionEdit { bool has_min_log_number_to_keep_ = false; bool has_last_sequence_ = false; + // Compaction cursors for round-robin compaction policy + CompactCursors compact_cursors_; + DeletedFiles deleted_files_; NewFiles new_files_; diff --git a/db/version_set.cc b/db/version_set.cc index 6b6035102..162dd00ee 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1798,6 +1798,7 @@ VersionStorageInfo::VersionStorageInfo( compaction_score_(num_levels_), compaction_level_(num_levels_), l0_delay_trigger_count_(0), + compact_cursor_(num_levels_), accumulated_file_size_(0), accumulated_raw_key_size_(0), accumulated_raw_value_size_(0), @@ -1820,6 +1821,8 @@ VersionStorageInfo::VersionStorageInfo( current_num_deletions_ = ref_vstorage->current_num_deletions_; current_num_samples_ = ref_vstorage->current_num_samples_; oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_; + compact_cursor_ = ref_vstorage->compact_cursor_; + compact_cursor_.resize(num_levels_); } } @@ -3192,6 +3195,60 @@ void SortFileByOverlappingRatio( file_to_order[f2.file->fd.GetNumber()]; }); } + +void SortFileByRoundRobin(const InternalKeyComparator& icmp, + std::vector* compact_cursor, + bool level0_non_overlapping, int level, + std::vector* temp) { + if (level == 0 && !level0_non_overlapping) { + // Using kOldestSmallestSeqFirst when level === 0, since the + // files may overlap (not fully sorted) + std::sort(temp->begin(), temp->end(), + [](const Fsize& f1, const Fsize& f2) -> bool { + return f1.file->fd.smallest_seqno < f2.file->fd.smallest_seqno; + }); + return; + } + + bool should_move_files = + compact_cursor->at(level).Valid() && temp->size() > 1; + + // The iterator points to the Fsize with smallest key larger than or equal to + // the given cursor + std::vector::iterator current_file_iter; + if (should_move_files) { + // Find the file of which the smallest key is larger than or equal to + // the cursor (the smallest key in the successor file of the last + // chosen file), skip this if the cursor is invalid or there is only + // one file in this level + current_file_iter = std::lower_bound( + temp->begin(), temp->end(), compact_cursor->at(level), + [&](const Fsize& f, const InternalKey& cursor) -> bool { + return icmp.Compare(cursor, f.file->smallest) > 0; + }); + + should_move_files = current_file_iter != temp->end(); + } + if (should_move_files) { + // Construct a local temporary vector + std::vector local_temp; + local_temp.reserve(temp->size()); + // Move the selected File into the first position and its successors + // into the second, third, ..., positions + for (auto iter = current_file_iter; iter != temp->end(); iter++) { + local_temp.push_back(*iter); + } + // Move the origin predecessors of the selected file in a round-robin + // manner + for (auto iter = temp->begin(); iter != current_file_iter; iter++) { + local_temp.push_back(*iter); + } + // Replace all the items in temp + for (size_t i = 0; i < local_temp.size(); i++) { + temp->at(i) = local_temp[i]; + } + } +} } // namespace void VersionStorageInfo::UpdateFilesByCompactionPri( @@ -3244,6 +3301,10 @@ void VersionStorageInfo::UpdateFilesByCompactionPri( files_[level + 1], ioptions.clock, level, num_non_empty_levels_, options.ttl, &temp); break; + case kRoundRobin: + SortFileByRoundRobin(*internal_comparator_, &compact_cursor_, + level0_non_overlapping_, level, &temp); + break; default: assert(false); } @@ -5285,6 +5346,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, delete[] vstorage -> files_; vstorage->files_ = new_files_list; vstorage->num_levels_ = new_levels; + vstorage->ResizeCompactCursors(new_levels); MutableCFOptions mutable_cf_options(*options); VersionEdit ve; @@ -5520,6 +5582,8 @@ Status VersionSet::WriteCurrentStateToManifest( } } + edit.SetCompactCursors(vstorage->GetCompactCursors()); + const auto& blob_files = vstorage->GetBlobFiles(); for (const auto& meta : blob_files) { assert(meta); diff --git a/db/version_set.h b/db/version_set.h index 93c887ac0..a1aec545e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -130,6 +130,36 @@ class VersionStorageInfo { void AddFile(int level, FileMetaData* f); + // Resize/Initialize the space for compact_cursor_ + void ResizeCompactCursors(int level) { + compact_cursor_.resize(level, InternalKey()); + } + + const std::vector& GetCompactCursors() const { + return compact_cursor_; + } + + // REQUIRES: ResizeCompactCursors has been called + void AddCursorForOneLevel(int level, + const InternalKey& smallest_uncompacted_key) { + compact_cursor_[level] = smallest_uncompacted_key; + } + + // REQUIRES: lock is held + // Update the compact cursor and advance the file index so that it can point + // to the next cursor + const InternalKey& GetNextCompactCursor(int level) { + int cmp_idx = next_file_to_compact_by_size_[level] + 1; + // TODO(zichen): may need to update next_file_to_compact_by_size_ + // for parallel compaction. + InternalKey new_cursor; + if (cmp_idx >= (int)files_by_compaction_pri_[level].size()) { + cmp_idx = 0; + } + // TODO(zichen): rethink if this strategy gives us some good guarantee + return files_[level][files_by_compaction_pri_[level][cmp_idx]]->smallest; + } + void ReserveBlob(size_t size) { blob_files_.reserve(size); } void AddBlobFile(std::shared_ptr blob_file_meta); @@ -657,6 +687,9 @@ class VersionStorageInfo { int l0_delay_trigger_count_ = 0; // Count used to trigger slow down and stop // for number of L0 files. + // Compact cursors for round-robin compactions in each level + std::vector compact_cursor_; + // the following are the sampled temporary stats. // the current accumulated size of sampled files. uint64_t accumulated_file_size_; diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 9cdd947be..c484872a0 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -55,6 +55,11 @@ enum CompactionPri : char { // and its size is the smallest. It in many cases can optimize write // amplification. kMinOverlappingRatio = 0x3, + // Keeps a cursor(s) of the successor of the file (key range) was/were + // compacted before, and always picks the next files (key range) in that + // level. The file picking process will cycle through all the files in a + // round-robin manner. + kRoundRobin = 0x4, }; struct CompactionOptionsFIFO { diff --git a/options/options_helper.cc b/options/options_helper.cc index 6af73c840..7c76a31ec 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -320,7 +320,8 @@ std::map OptionsHelper::compaction_pri_to_string = { {kByCompensatedSize, "kByCompensatedSize"}, {kOldestLargestSeqFirst, "kOldestLargestSeqFirst"}, {kOldestSmallestSeqFirst, "kOldestSmallestSeqFirst"}, - {kMinOverlappingRatio, "kMinOverlappingRatio"}}; + {kMinOverlappingRatio, "kMinOverlappingRatio"}, + {kRoundRobin, "kRoundRobin"}}; std::map OptionsHelper::compaction_stop_style_to_string = { @@ -830,7 +831,8 @@ std::unordered_map {"kByCompensatedSize", kByCompensatedSize}, {"kOldestLargestSeqFirst", kOldestLargestSeqFirst}, {"kOldestSmallestSeqFirst", kOldestSmallestSeqFirst}, - {"kMinOverlappingRatio", kMinOverlappingRatio}}; + {"kMinOverlappingRatio", kMinOverlappingRatio}, + {"kRoundRobin", kRoundRobin}}; std::unordered_map OptionsHelper::compaction_stop_style_string_map = {