diff --git a/db/compaction.cc b/db/compaction.cc index 21bdcf2a0..d7a7943c0 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -162,9 +162,6 @@ Compaction::Compaction(VersionStorageInfo* vstorage, deletion_compaction_(_deletion_compaction), inputs_(std::move(_inputs)), grandparents_(std::move(_grandparents)), - grandparent_index_(0), - seen_key_(false), - overlapped_bytes_(0), score_(_score), bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)), is_full_compaction_(IsFullCompaction(vstorage, inputs_)), @@ -290,32 +287,6 @@ bool Compaction::KeyNotExistsBeyondOutputLevel( return true; } -bool Compaction::ShouldStopBefore(const Slice& internal_key) { - // Scan to find earliest grandparent file that contains key. - const InternalKeyComparator* icmp = &cfd_->internal_comparator(); - while (grandparent_index_ < grandparents_.size() && - icmp->Compare(internal_key, - grandparents_[grandparent_index_]->largest.Encode()) > 0) { - if (seen_key_) { - overlapped_bytes_ += grandparents_[grandparent_index_]->fd.GetFileSize(); - } - assert(grandparent_index_ + 1 >= grandparents_.size() || - icmp->Compare(grandparents_[grandparent_index_]->largest.Encode(), - grandparents_[grandparent_index_+1]->smallest.Encode()) - < 0); - grandparent_index_++; - } - seen_key_ = true; - - if (overlapped_bytes_ > max_grandparent_overlap_bytes_) { - // Too much overlap for current output; start new output - overlapped_bytes_ = 0; - return true; - } else { - return false; - } -} - // Mark (or clear) each file that is being compacted void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) { for (size_t i = 0; i < num_input_levels(); i++) { @@ -421,7 +392,7 @@ void Compaction::Summary(char* output, int len) { snprintf(output + write, len - write, "]"); } -uint64_t Compaction::OutputFilePreallocationSize() { +uint64_t Compaction::OutputFilePreallocationSize() const { uint64_t preallocation_size = 0; if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel || diff --git a/db/compaction.h b/db/compaction.h index 729c4edaf..cab0d9c0c 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -104,7 +104,7 @@ class Compaction { } // Returns the LevelFilesBrief of the specified compaction input level. - LevelFilesBrief* input_levels(size_t compaction_input_level) { + const LevelFilesBrief* input_levels(size_t compaction_input_level) const { return &input_levels_[compaction_input_level]; } @@ -132,10 +132,6 @@ class Compaction { bool KeyNotExistsBeyondOutputLevel(const Slice& user_key, std::vector* level_ptrs) const; - // Returns true iff we should stop building the current output - // before processing "internal_key". - bool ShouldStopBefore(const Slice& internal_key); - // Clear all files to indicate that they are not being compacted // Delete this compaction from the list of running compactions. // @@ -151,13 +147,13 @@ class Compaction { double score() const { return score_; } // Is this compaction creating a file in the bottom most level? - bool bottommost_level() { return bottommost_level_; } + bool bottommost_level() const { return bottommost_level_; } // Does this compaction include all sst files? - bool is_full_compaction() { return is_full_compaction_; } + bool is_full_compaction() const { return is_full_compaction_; } // Was this compaction triggered manually by the client? - bool is_manual_compaction() { return is_manual_compaction_; } + bool is_manual_compaction() const { return is_manual_compaction_; } // Used when allow_trivial_move option is set in // Universal compaction. If all the input files are @@ -170,19 +166,21 @@ class Compaction { // Used when allow_trivial_move option is set in // Universal compaction. Returns true, if the input files // are non-overlapping and can be trivially moved. - bool is_trivial_move() { return is_trivial_move_; } + bool is_trivial_move() const { return is_trivial_move_; } // How many total levels are there? int number_levels() const { return number_levels_; } // Return the MutableCFOptions that should be used throughout the compaction // procedure - const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; } + const MutableCFOptions* mutable_cf_options() const { + return &mutable_cf_options_; + } // Returns the size in bytes that the output file should be preallocated to. // In level compaction, that is max_file_size_. In universal compaction, that // is the sum of all input file sizes. - uint64_t OutputFilePreallocationSize(); + uint64_t OutputFilePreallocationSize() const; void SetInputVersion(Version* input_version); @@ -225,6 +223,14 @@ class Compaction { CompactionReason compaction_reason() { return compaction_reason_; } + const std::vector& grandparents() const { + return grandparents_; + } + + uint64_t max_grandparent_overlap_bytes() const { + return max_grandparent_overlap_bytes_; + } + private: // mark (or clear) all files that are being compacted void MarkFilesBeingCompacted(bool mark_as_compacted); @@ -268,10 +274,6 @@ class Compaction { // State used to check for number of of overlapping grandparent files // (grandparent == "output_level_ + 1") std::vector grandparents_; - size_t grandparent_index_; // Index in grandparent_starts_ - bool seen_key_; // Some output key has been seen - uint64_t overlapped_bytes_; // Bytes of overlap between current output - // and grandparent files const double score_; // score that was used to pick this compaction. // Is this compaction creating a file in the bottom most level? diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 20eed4f3d..a649a24a6 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -14,7 +14,7 @@ CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, Env* env, - bool expect_valid_internal_key, Compaction* compaction, + bool expect_valid_internal_key, const Compaction* compaction, const CompactionFilter* compaction_filter, LogBuffer* log_buffer) : input_(input), cmp_(cmp), diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index b13aef3ff..6d791d418 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -42,7 +42,7 @@ class CompactionIterator { std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, Env* env, bool expect_valid_internal_key, - Compaction* compaction = nullptr, + const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, LogBuffer* log_buffer = nullptr); @@ -92,7 +92,7 @@ class CompactionIterator { const SequenceNumber earliest_write_conflict_snapshot_; Env* env_; bool expect_valid_internal_key_; - Compaction* compaction_; + const Compaction* compaction_; const CompactionFilter* compaction_filter_; LogBuffer* log_buffer_; bool bottommost_level_; diff --git a/db/compaction_job.cc b/db/compaction_job.cc index b04541adb..450625019 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -63,7 +63,7 @@ namespace rocksdb { // Maintains state for each sub-compaction struct CompactionJob::SubcompactionState { - Compaction* compaction; + const Compaction* compaction; std::unique_ptr c_iter; // The boundaries of the key-range this compaction is interested in. No two @@ -104,6 +104,14 @@ struct CompactionJob::SubcompactionState { uint64_t num_output_records; CompactionJobStats compaction_job_stats; uint64_t approx_size; + // An index that used to speed up Compaction::ShouldStopBefore(). + size_t grandparent_index = 0; + // The number of bytes overlapping between the current output and + // grandparent files used in Compaction::ShouldStopBefore(). + uint64_t overlapped_bytes = 0; + // A flag determine whether the key has been seen in + // Compaction::ShouldStopBefore() + bool seen_key = false; SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size = 0) @@ -115,7 +123,10 @@ struct CompactionJob::SubcompactionState { total_bytes(0), num_input_records(0), num_output_records(0), - approx_size(size) { + approx_size(size), + grandparent_index(0), + overlapped_bytes(0), + seen_key(false) { assert(compaction != nullptr); } @@ -134,6 +145,9 @@ struct CompactionJob::SubcompactionState { num_output_records = std::move(o.num_output_records); compaction_job_stats = std::move(o.compaction_job_stats); approx_size = std::move(o.approx_size); + grandparent_index = std::move(o.grandparent_index); + overlapped_bytes = std::move(o.overlapped_bytes); + seen_key = std::move(o.seen_key); return *this; } @@ -141,6 +155,38 @@ struct CompactionJob::SubcompactionState { SubcompactionState(const SubcompactionState&) = delete; SubcompactionState& operator=(const SubcompactionState&) = delete; + + // Returns true iff we should stop building the current output + // before processing "internal_key". + bool ShouldStopBefore(const Slice& internal_key) { + const InternalKeyComparator* icmp = + &compaction->column_family_data()->internal_comparator(); + const std::vector& grandparents = compaction->grandparents(); + + // Scan to find earliest grandparent file that contains key. + while (grandparent_index < grandparents.size() && + icmp->Compare(internal_key, + grandparents[grandparent_index]->largest.Encode()) > + 0) { + if (seen_key) { + overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize(); + } + assert(grandparent_index + 1 >= grandparents.size() || + icmp->Compare( + grandparents[grandparent_index]->largest.Encode(), + grandparents[grandparent_index + 1]->smallest.Encode()) < 0); + grandparent_index++; + } + seen_key = true; + + if (overlapped_bytes > compaction->max_grandparent_overlap_bytes()) { + // Too much overlap for current output; start new output + overlapped_bytes = 0; + return true; + } + + return false; + } }; // Maintains state for the entire compaction @@ -669,7 +715,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (end != nullptr && cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) { break; - } else if (sub_compact->compaction->ShouldStopBefore(key) && + } else if (sub_compact->ShouldStopBefore(key) && sub_compact->builder != nullptr) { status = FinishCompactionOutputFile(input->status(), sub_compact); if (!status.ok()) { diff --git a/db/version_set.cc b/db/version_set.cc index 1da447540..707de2f99 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3240,7 +3240,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { } } -InternalIterator* VersionSet::MakeInputIterator(Compaction* c) { +InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) { auto cfd = c->column_family_data(); ReadOptions read_options; read_options.verify_checksums = diff --git a/db/version_set.h b/db/version_set.h index d9ff91732..5aa7d56fa 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -671,7 +671,7 @@ class VersionSet { // Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed. - InternalIterator* MakeInputIterator(Compaction* c); + InternalIterator* MakeInputIterator(const Compaction* c); // Add all files listed in any live version to *live. void AddLiveFiles(std::vector* live_list);