// Copyright (c) 2013, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/compaction.h" #ifndef __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS #endif #include #include #include "rocksdb/compaction_filter.h" #include "db/column_family.h" #include "util/logging.h" #include "util/sync_point.h" namespace rocksdb { uint64_t TotalFileSize(const std::vector& files) { uint64_t sum = 0; for (size_t i = 0; i < files.size() && files[i]; i++) { sum += files[i]->fd.GetFileSize(); } return sum; } void Compaction::SetInputVersion(Version* _input_version) { input_version_ = _input_version; cfd_ = input_version_->cfd(); cfd_->Ref(); input_version_->Ref(); edit_.SetColumnFamily(cfd_->GetID()); } // helper function to determine if compaction is creating files at the // bottommost level bool Compaction::IsBottommostLevel( int output_level, VersionStorageInfo* vstorage, const std::vector& inputs) { if (inputs[0].level == 0 && inputs[0].files.back() != vstorage->LevelFiles(0).back()) { return false; } // checks whether there are files living beyond the output_level. for (int i = output_level + 1; i < vstorage->num_levels(); i++) { if (vstorage->NumLevelFiles(i) > 0) { return false; } } return true; } bool Compaction::IsFullCompaction( VersionStorageInfo* vstorage, const std::vector& inputs) { int num_files_in_compaction = 0; int total_num_files = 0; for (int l = 0; l < vstorage->num_levels(); l++) { total_num_files += vstorage->NumLevelFiles(l); } for (size_t i = 0; i < inputs.size(); i++) { num_files_in_compaction += inputs[i].size(); } return num_files_in_compaction == total_num_files; } Compaction::Compaction(VersionStorageInfo* vstorage, const MutableCFOptions& _mutable_cf_options, std::vector _inputs, int _output_level, uint64_t _target_file_size, uint64_t _max_grandparent_overlap_bytes, uint32_t _output_path_id, CompressionType _compression, std::vector _grandparents, bool _manual_compaction, double _score, bool _deletion_compaction) : start_level_(_inputs[0].level), output_level_(_output_level), max_output_file_size_(_target_file_size), max_grandparent_overlap_bytes_(_max_grandparent_overlap_bytes), mutable_cf_options_(_mutable_cf_options), input_version_(nullptr), number_levels_(vstorage->num_levels()), cfd_(nullptr), output_path_id_(_output_path_id), output_compression_(_compression), deletion_compaction_(_deletion_compaction), inputs_(std::move(_inputs)), grandparents_(std::move(_grandparents)), grandparent_index_(0), seen_key_(false), overlapped_bytes_(0), score_(_score), bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)), is_full_compaction_(IsFullCompaction(vstorage, inputs_)), is_manual_compaction_(_manual_compaction), level_ptrs_(std::vector(number_levels_, 0)) { MarkFilesBeingCompacted(true); #ifndef NDEBUG for (size_t i = 1; i < inputs_.size(); ++i) { assert(inputs_[i].level > inputs_[i - 1].level); } #endif // setup input_levels_ { input_levels_.resize(num_input_levels()); for (size_t which = 0; which < num_input_levels(); which++) { DoGenerateLevelFilesBrief(&input_levels_[which], inputs_[which].files, &arena_); } } } Compaction::~Compaction() { if (input_version_ != nullptr) { input_version_->Unref(); } if (cfd_ != nullptr) { if (cfd_->Unref()) { delete cfd_; } } } bool Compaction::InputCompressionMatchesOutput() const { int base_level = input_version_->storage_info()->base_level(); bool matches = (GetCompressionType(*cfd_->ioptions(), start_level_, base_level) == output_compression_); if (matches) { TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches"); return true; } TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:DidntMatch"); return matches; } bool Compaction::IsTrivialMove() const { // Avoid a move if there is lots of overlapping grandparent data. // Otherwise, the move could create a parent file that will require // a very expensive merge later on. // If start_level_== output_level_, the purpose is to force compaction // filter to be applied to that level, and thus cannot be a trivial move. // Check if start level have files with overlapping ranges if (start_level_ == 0 && input_version_->storage_info()->level0_non_overlapping() == false) { // We cannot move files from L0 to L1 if the files are overlapping return false; } if (is_manual_compaction_ && (cfd_->ioptions()->compaction_filter != nullptr || cfd_->ioptions()->compaction_filter_factory != nullptr || cfd_->ioptions()->compaction_filter_factory_v2 != nullptr)) { // This is a manual compaction and we have a compaction filter that should // be executed, we cannot do a trivial move return false; } // Used in universal compaction, where trivial move can be done if the // input files are non overlapping if (cfd_->ioptions()->compaction_options_universal.allow_trivial_move) { return is_trivial_move_; } return (start_level_ != output_level_ && num_input_levels() == 1 && input(0, 0)->fd.GetPathId() == output_path_id() && InputCompressionMatchesOutput() && TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_); } void Compaction::AddInputDeletions(VersionEdit* out_edit) { for (size_t which = 0; which < num_input_levels(); which++) { for (size_t i = 0; i < inputs_[which].size(); i++) { out_edit->DeleteFile(level(which), inputs_[which][i]->fd.GetNumber()); } } } bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) { assert(input_version_ != nullptr); assert(cfd_->ioptions()->compaction_style != kCompactionStyleFIFO); if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) { return bottommost_level_; } // Maybe use binary search to find right entry instead of linear search? const Comparator* user_cmp = cfd_->user_comparator(); for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) { const std::vector& files = input_version_->storage_info()->LevelFiles(lvl); for (; level_ptrs_[lvl] < files.size(); ) { FileMetaData* f = files[level_ptrs_[lvl]]; if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) { // We've advanced far enough if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) { // Key falls in this file's range, so definitely // exists beyond output level return false; } break; } level_ptrs_[lvl]++; } } return true; } bool Compaction::ShouldStopBefore(const Slice& internal_key) { // Scan to find earliest grandparent file that contains key. const InternalKeyComparator* icmp = &cfd_->internal_comparator(); while (grandparent_index_ < grandparents_.size() && icmp->Compare(internal_key, grandparents_[grandparent_index_]->largest.Encode()) > 0) { if (seen_key_) { overlapped_bytes_ += grandparents_[grandparent_index_]->fd.GetFileSize(); } assert(grandparent_index_ + 1 >= grandparents_.size() || icmp->Compare(grandparents_[grandparent_index_]->largest.Encode(), grandparents_[grandparent_index_+1]->smallest.Encode()) < 0); grandparent_index_++; } seen_key_ = true; if (overlapped_bytes_ > max_grandparent_overlap_bytes_) { // Too much overlap for current output; start new output overlapped_bytes_ = 0; return true; } else { return false; } } // Mark (or clear) each file that is being compacted void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) { for (size_t i = 0; i < num_input_levels(); i++) { for (unsigned int j = 0; j < inputs_[i].size(); j++) { assert(mark_as_compacted ? !inputs_[i][j]->being_compacted : inputs_[i][j]->being_compacted); inputs_[i][j]->being_compacted = mark_as_compacted; } } } // Sample output: // If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5, // print: "3@0 + 2@3 + 1@4 files to L5" const char* Compaction::InputLevelSummary( InputLevelSummaryBuffer* scratch) const { int len = 0; bool is_first = true; for (auto& input_level : inputs_) { if (input_level.empty()) { continue; } if (!is_first) { len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " + "); } else { is_first = false; } len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "%zu@%d", input_level.size(), input_level.level); } snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " files to L%d", output_level()); return scratch->buffer; } uint64_t Compaction::CalculateTotalInputSize() const { uint64_t size = 0; for (auto& input_level : inputs_) { for (auto f : input_level.files) { size += f->fd.GetFileSize(); } } return size; } void Compaction::ReleaseCompactionFiles(Status status) { MarkFilesBeingCompacted(false); cfd_->compaction_picker()->ReleaseCompactionFiles(this, status); } void Compaction::ResetNextCompactionIndex() { assert(input_version_ != nullptr); input_version_->storage_info()->ResetNextCompactionIndex(start_level_); } namespace { int InputSummary(const std::vector& files, char* output, int len) { *output = '\0'; int write = 0; for (unsigned int i = 0; i < files.size(); i++) { int sz = len - write; int ret; char sztxt[16]; AppendHumanBytes(files.at(i)->fd.GetFileSize(), sztxt, 16); ret = snprintf(output + write, sz, "%" PRIu64 "(%s) ", files.at(i)->fd.GetNumber(), sztxt); if (ret < 0 || ret >= sz) break; write += ret; } // if files.size() is non-zero, overwrite the last space return write - !!files.size(); } } // namespace void Compaction::Summary(char* output, int len) { int write = snprintf(output, len, "Base version %" PRIu64 " Base level %d, inputs: [", input_version_->GetVersionNumber(), start_level_); if (write < 0 || write >= len) { return; } for (size_t level_iter = 0; level_iter < num_input_levels(); ++level_iter) { if (level_iter > 0) { write += snprintf(output + write, len - write, "], ["); if (write < 0 || write >= len) { return; } } write += InputSummary(inputs_[level_iter].files, output + write, len - write); if (write < 0 || write >= len) { return; } } snprintf(output + write, len - write, "]"); } uint64_t Compaction::OutputFilePreallocationSize() { uint64_t preallocation_size = 0; if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel || output_level() > 0) { preallocation_size = max_output_file_size_; } else { // output_level() == 0 assert(num_input_levels() > 0); for (const auto& f : inputs_[0].files) { preallocation_size += f->fd.GetFileSize(); } } // Over-estimate slightly so we don't end up just barely crossing // the threshold return preallocation_size * 1.1; } std::unique_ptr Compaction::CreateCompactionFilter() const { if (!cfd_->ioptions()->compaction_filter_factory) { return nullptr; } CompactionFilter::Context context; context.is_full_compaction = is_full_compaction_; context.is_manual_compaction = is_manual_compaction_; return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter( context); } std::unique_ptr Compaction::CreateCompactionFilterV2() const { if (!cfd_->ioptions()->compaction_filter_factory_v2) { return nullptr; } CompactionFilterContext context; context.is_full_compaction = is_full_compaction_; context.is_manual_compaction = is_manual_compaction_; return cfd_->ioptions()->compaction_filter_factory_v2->CreateCompactionFilterV2( context); } } // namespace rocksdb