From be9816b3d970e0eceb931c2643a976dbea068da2 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Thu, 24 Mar 2016 19:36:39 -0700 Subject: [PATCH] Fix data race issue when sub-compaction is used in CompactionJob Summary: When subcompaction is used, all subcompactions share the same Compaction pointer in CompactionJob while each subcompaction all keeps their mutable stats in SubcompactionState. However, there're still some mutable part that is currently store in the shared Compaction pointer. This patch makes two changes: 1. Make the shared Compaction pointer const so that it can never be modified during the compaction. 2. Move necessary states from Compaction to SubcompactionState. 3. Make functions of Compaction const if the function does not modify its internal state. Test Plan: rocksdb and MyRocks test Reviewers: sdong, kradhakrishnan, andrewkr, IslamAbdelRahman Reviewed By: IslamAbdelRahman Subscribers: andrewkr, dhruba, yoshinorim, gunnarku, leveldb Differential Revision: https://reviews.facebook.net/D55923 --- db/compaction.cc | 31 +---------------------- db/compaction.h | 32 +++++++++++++----------- db/compaction_iterator.cc | 2 +- db/compaction_iterator.h | 4 +-- db/compaction_job.cc | 52 ++++++++++++++++++++++++++++++++++++--- db/version_set.cc | 2 +- db/version_set.h | 2 +- 7 files changed, 72 insertions(+), 53 deletions(-) diff --git a/db/compaction.cc b/db/compaction.cc index 21bdcf2a0..d7a7943c0 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -162,9 +162,6 @@ Compaction::Compaction(VersionStorageInfo* vstorage, deletion_compaction_(_deletion_compaction), inputs_(std::move(_inputs)), grandparents_(std::move(_grandparents)), - grandparent_index_(0), - seen_key_(false), - overlapped_bytes_(0), score_(_score), bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)), is_full_compaction_(IsFullCompaction(vstorage, inputs_)), @@ -290,32 +287,6 @@ bool Compaction::KeyNotExistsBeyondOutputLevel( return true; } -bool Compaction::ShouldStopBefore(const Slice& internal_key) { - // Scan to find earliest grandparent file that contains key. - const InternalKeyComparator* icmp = &cfd_->internal_comparator(); - while (grandparent_index_ < grandparents_.size() && - icmp->Compare(internal_key, - grandparents_[grandparent_index_]->largest.Encode()) > 0) { - if (seen_key_) { - overlapped_bytes_ += grandparents_[grandparent_index_]->fd.GetFileSize(); - } - assert(grandparent_index_ + 1 >= grandparents_.size() || - icmp->Compare(grandparents_[grandparent_index_]->largest.Encode(), - grandparents_[grandparent_index_+1]->smallest.Encode()) - < 0); - grandparent_index_++; - } - seen_key_ = true; - - if (overlapped_bytes_ > max_grandparent_overlap_bytes_) { - // Too much overlap for current output; start new output - overlapped_bytes_ = 0; - return true; - } else { - return false; - } -} - // Mark (or clear) each file that is being compacted void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) { for (size_t i = 0; i < num_input_levels(); i++) { @@ -421,7 +392,7 @@ void Compaction::Summary(char* output, int len) { snprintf(output + write, len - write, "]"); } -uint64_t Compaction::OutputFilePreallocationSize() { +uint64_t Compaction::OutputFilePreallocationSize() const { uint64_t preallocation_size = 0; if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel || diff --git a/db/compaction.h b/db/compaction.h index 729c4edaf..cab0d9c0c 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -104,7 +104,7 @@ class Compaction { } // Returns the LevelFilesBrief of the specified compaction input level. - LevelFilesBrief* input_levels(size_t compaction_input_level) { + const LevelFilesBrief* input_levels(size_t compaction_input_level) const { return &input_levels_[compaction_input_level]; } @@ -132,10 +132,6 @@ class Compaction { bool KeyNotExistsBeyondOutputLevel(const Slice& user_key, std::vector* level_ptrs) const; - // Returns true iff we should stop building the current output - // before processing "internal_key". - bool ShouldStopBefore(const Slice& internal_key); - // Clear all files to indicate that they are not being compacted // Delete this compaction from the list of running compactions. // @@ -151,13 +147,13 @@ class Compaction { double score() const { return score_; } // Is this compaction creating a file in the bottom most level? - bool bottommost_level() { return bottommost_level_; } + bool bottommost_level() const { return bottommost_level_; } // Does this compaction include all sst files? - bool is_full_compaction() { return is_full_compaction_; } + bool is_full_compaction() const { return is_full_compaction_; } // Was this compaction triggered manually by the client? - bool is_manual_compaction() { return is_manual_compaction_; } + bool is_manual_compaction() const { return is_manual_compaction_; } // Used when allow_trivial_move option is set in // Universal compaction. If all the input files are @@ -170,19 +166,21 @@ class Compaction { // Used when allow_trivial_move option is set in // Universal compaction. Returns true, if the input files // are non-overlapping and can be trivially moved. - bool is_trivial_move() { return is_trivial_move_; } + bool is_trivial_move() const { return is_trivial_move_; } // How many total levels are there? int number_levels() const { return number_levels_; } // Return the MutableCFOptions that should be used throughout the compaction // procedure - const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; } + const MutableCFOptions* mutable_cf_options() const { + return &mutable_cf_options_; + } // Returns the size in bytes that the output file should be preallocated to. // In level compaction, that is max_file_size_. In universal compaction, that // is the sum of all input file sizes. - uint64_t OutputFilePreallocationSize(); + uint64_t OutputFilePreallocationSize() const; void SetInputVersion(Version* input_version); @@ -225,6 +223,14 @@ class Compaction { CompactionReason compaction_reason() { return compaction_reason_; } + const std::vector& grandparents() const { + return grandparents_; + } + + uint64_t max_grandparent_overlap_bytes() const { + return max_grandparent_overlap_bytes_; + } + private: // mark (or clear) all files that are being compacted void MarkFilesBeingCompacted(bool mark_as_compacted); @@ -268,10 +274,6 @@ class Compaction { // State used to check for number of of overlapping grandparent files // (grandparent == "output_level_ + 1") std::vector grandparents_; - size_t grandparent_index_; // Index in grandparent_starts_ - bool seen_key_; // Some output key has been seen - uint64_t overlapped_bytes_; // Bytes of overlap between current output - // and grandparent files const double score_; // score that was used to pick this compaction. // Is this compaction creating a file in the bottom most level? diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 20eed4f3d..a649a24a6 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -14,7 +14,7 @@ CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, Env* env, - bool expect_valid_internal_key, Compaction* compaction, + bool expect_valid_internal_key, const Compaction* compaction, const CompactionFilter* compaction_filter, LogBuffer* log_buffer) : input_(input), cmp_(cmp), diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index b13aef3ff..6d791d418 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -42,7 +42,7 @@ class CompactionIterator { std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, Env* env, bool expect_valid_internal_key, - Compaction* compaction = nullptr, + const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, LogBuffer* log_buffer = nullptr); @@ -92,7 +92,7 @@ class CompactionIterator { const SequenceNumber earliest_write_conflict_snapshot_; Env* env_; bool expect_valid_internal_key_; - Compaction* compaction_; + const Compaction* compaction_; const CompactionFilter* compaction_filter_; LogBuffer* log_buffer_; bool bottommost_level_; diff --git a/db/compaction_job.cc b/db/compaction_job.cc index b04541adb..450625019 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -63,7 +63,7 @@ namespace rocksdb { // Maintains state for each sub-compaction struct CompactionJob::SubcompactionState { - Compaction* compaction; + const Compaction* compaction; std::unique_ptr c_iter; // The boundaries of the key-range this compaction is interested in. No two @@ -104,6 +104,14 @@ struct CompactionJob::SubcompactionState { uint64_t num_output_records; CompactionJobStats compaction_job_stats; uint64_t approx_size; + // An index that used to speed up Compaction::ShouldStopBefore(). + size_t grandparent_index = 0; + // The number of bytes overlapping between the current output and + // grandparent files used in Compaction::ShouldStopBefore(). + uint64_t overlapped_bytes = 0; + // A flag determine whether the key has been seen in + // Compaction::ShouldStopBefore() + bool seen_key = false; SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size = 0) @@ -115,7 +123,10 @@ struct CompactionJob::SubcompactionState { total_bytes(0), num_input_records(0), num_output_records(0), - approx_size(size) { + approx_size(size), + grandparent_index(0), + overlapped_bytes(0), + seen_key(false) { assert(compaction != nullptr); } @@ -134,6 +145,9 @@ struct CompactionJob::SubcompactionState { num_output_records = std::move(o.num_output_records); compaction_job_stats = std::move(o.compaction_job_stats); approx_size = std::move(o.approx_size); + grandparent_index = std::move(o.grandparent_index); + overlapped_bytes = std::move(o.overlapped_bytes); + seen_key = std::move(o.seen_key); return *this; } @@ -141,6 +155,38 @@ struct CompactionJob::SubcompactionState { SubcompactionState(const SubcompactionState&) = delete; SubcompactionState& operator=(const SubcompactionState&) = delete; + + // Returns true iff we should stop building the current output + // before processing "internal_key". + bool ShouldStopBefore(const Slice& internal_key) { + const InternalKeyComparator* icmp = + &compaction->column_family_data()->internal_comparator(); + const std::vector& grandparents = compaction->grandparents(); + + // Scan to find earliest grandparent file that contains key. + while (grandparent_index < grandparents.size() && + icmp->Compare(internal_key, + grandparents[grandparent_index]->largest.Encode()) > + 0) { + if (seen_key) { + overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize(); + } + assert(grandparent_index + 1 >= grandparents.size() || + icmp->Compare( + grandparents[grandparent_index]->largest.Encode(), + grandparents[grandparent_index + 1]->smallest.Encode()) < 0); + grandparent_index++; + } + seen_key = true; + + if (overlapped_bytes > compaction->max_grandparent_overlap_bytes()) { + // Too much overlap for current output; start new output + overlapped_bytes = 0; + return true; + } + + return false; + } }; // Maintains state for the entire compaction @@ -669,7 +715,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (end != nullptr && cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) { break; - } else if (sub_compact->compaction->ShouldStopBefore(key) && + } else if (sub_compact->ShouldStopBefore(key) && sub_compact->builder != nullptr) { status = FinishCompactionOutputFile(input->status(), sub_compact); if (!status.ok()) { diff --git a/db/version_set.cc b/db/version_set.cc index 1da447540..707de2f99 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3240,7 +3240,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { } } -InternalIterator* VersionSet::MakeInputIterator(Compaction* c) { +InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) { auto cfd = c->column_family_data(); ReadOptions read_options; read_options.verify_checksums = diff --git a/db/version_set.h b/db/version_set.h index d9ff91732..5aa7d56fa 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -671,7 +671,7 @@ class VersionSet { // Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed. - InternalIterator* MakeInputIterator(Compaction* c); + InternalIterator* MakeInputIterator(const Compaction* c); // Add all files listed in any live version to *live. void AddLiveFiles(std::vector* live_list);