diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 70be388c9..f74e63436 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -23,6 +23,7 @@ namespace rocksdb { +namespace { uint64_t TotalCompensatedFileSize(const std::vector& files) { uint64_t sum = 0; for (size_t i = 0; i < files.size() && files[i]; i++) { @@ -31,7 +32,6 @@ uint64_t TotalCompensatedFileSize(const std::vector& files) { return sum; } -namespace { // Determine compression type, based on user options, level of the output // file and whether compression is disabled. // If enable_compression is false, then compression is always disabled no @@ -71,19 +71,6 @@ CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions, CompactionPicker::~CompactionPicker() {} -void CompactionPicker::SizeBeingCompacted(std::vector& sizes) { - for (int level = 0; level < NumberLevels() - 1; level++) { - uint64_t total = 0; - for (auto c : compactions_in_progress_[level]) { - assert(c->level() == level); - for (size_t i = 0; i < c->num_input_files(0); i++) { - total += c->input(0, i)->compensated_file_size; - } - } - sizes[level] = total; - } -} - // Clear all files to indicate that they are not being compacted // Delete this compaction from the list of running compactions. void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) { @@ -763,13 +750,9 @@ Compaction* LevelCompactionPicker::PickCompaction( // being compacted). Since we just changed compaction score, we recalculate it // here { // this piece of code recomputes compaction score - std::vector size_being_compacted(NumberLevels() - 1); - SizeBeingCompacted(size_being_compacted); - CompactionOptionsFIFO dummy_compaction_options_fifo; vstorage->ComputeCompactionScore(mutable_cf_options, - dummy_compaction_options_fifo, - size_being_compacted); + dummy_compaction_options_fifo); } return c; diff --git a/db/compaction_picker.h b/db/compaction_picker.h index ad72e609a..7cc58d66b 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -91,10 +91,6 @@ class CompactionPicker { // Free up the files that participated in a compaction void ReleaseCompactionFiles(Compaction* c, Status status); - // Return the total amount of data that is undergoing - // compactions per level - void SizeBeingCompacted(std::vector& sizes); - // Returns true if any one of the specified files are being compacted bool FilesInCompaction(const std::vector& files); @@ -314,7 +310,4 @@ class NullCompactionPicker : public CompactionPicker { }; #endif // !ROCKSDB_LITE -// Utility function -extern uint64_t TotalCompensatedFileSize(const std::vector& files); - } // namespace rocksdb diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index 5ffc74f0d..dd5c06a54 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -83,8 +83,6 @@ class CompactionPickerTest { } void UpdateVersionStorageInfo() { - vstorage_->ComputeCompactionScore(mutable_cf_options_, fifo_options_, - size_being_compacted_); vstorage_->UpdateFilesBySize(); vstorage_->UpdateNumNonEmptyLevels(); vstorage_->GenerateFileIndexer(); diff --git a/db/version_builder_test.cc b/db/version_builder_test.cc index 5da73cbc3..a48b4e3a2 100644 --- a/db/version_builder_test.cc +++ b/db/version_builder_test.cc @@ -74,8 +74,6 @@ class VersionBuilderTest { } void UpdateVersionStorageInfo() { - vstorage_.ComputeCompactionScore(mutable_cf_options_, fifo_options_, - size_being_compacted_); vstorage_.UpdateFilesBySize(); vstorage_.UpdateNumNonEmptyLevels(); vstorage_.GenerateFileIndexer(); diff --git a/db/version_edit.h b/db/version_edit.h index 35b894954..6f7a692f3 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -76,8 +76,10 @@ struct FileMetaData { // File size compensated by deletion entry. // This is updated in Version::UpdateAccumulatedStats() first time when the - // file is created or loaded. After it is updated, it is immutable. + // file is created or loaded. After it is updated (!= 0), it is immutable. uint64_t compensated_file_size; + // These values can mutate, but they can only be read or written from + // single-threaded LogAndApply thread uint64_t num_entries; // the number of entries. uint64_t num_deletions; // the number of deletion entries. uint64_t raw_key_size; // total uncompressed key size. diff --git a/db/version_set.cc b/db/version_set.cc index a0af2decc..3b6d57f55 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -850,12 +850,8 @@ void VersionStorageInfo::GenerateLevelFilesBrief() { } } -void Version::PrepareApply(const MutableCFOptions& mutable_cf_options, - std::vector& size_being_compacted) { +void Version::PrepareApply() { UpdateAccumulatedStats(); - storage_info_.ComputeCompactionScore( - mutable_cf_options, cfd_->ioptions()->compaction_options_fifo, - size_being_compacted); storage_info_.UpdateFilesBySize(); storage_info_.UpdateNumNonEmptyLevels(); storage_info_.GenerateFileIndexer(); @@ -947,7 +943,9 @@ void VersionStorageInfo::ComputeCompensatedSizes() { for (int level = 0; level < num_levels_; level++) { for (auto* file_meta : files_[level]) { // Here we only compute compensated_file_size for those file_meta - // which compensated_file_size is uninitialized (== 0). + // which compensated_file_size is uninitialized (== 0). This is true only + // for files that have been created right now and no other thread has + // access to them. That's why we can safely mutate compensated_file_size. if (file_meta->compensated_file_size == 0) { file_meta->compensated_file_size = file_meta->fd.GetFileSize() + file_meta->num_deletions * average_value_size * @@ -966,8 +964,7 @@ int VersionStorageInfo::MaxInputLevel() const { void VersionStorageInfo::ComputeCompactionScore( const MutableCFOptions& mutable_cf_options, - const CompactionOptionsFIFO& compaction_options_fifo, - std::vector& size_being_compacted) { + const CompactionOptionsFIFO& compaction_options_fifo) { double max_score = 0; int max_score_level = 0; @@ -1008,9 +1005,13 @@ void VersionStorageInfo::ComputeCompactionScore( } } else { // Compute the ratio of current size to size limit. - const uint64_t level_bytes = - TotalCompensatedFileSize(files_[level]) - size_being_compacted[level]; - score = static_cast(level_bytes) / + uint64_t level_bytes_no_compacting = 0; + for (auto f : files_[level]) { + if (f && f->being_compacted == false) { + level_bytes_no_compacting += f->compensated_file_size; + } + } + score = static_cast(level_bytes_no_compacting) / mutable_cf_options.MaxBytesForLevel(level); if (max_score < score) { max_score = score; @@ -1527,6 +1528,11 @@ VersionSet::~VersionSet() { void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, Version* v) { + // compute new compaction score + v->storage_info()->ComputeCompactionScore( + *column_family_data->GetLatestMutableCFOptions(), + column_family_data->ioptions()->compaction_options_fifo); + // Mark v finalized v->storage_info_.SetFinalized(); @@ -1637,13 +1643,6 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, // Unlock during expensive operations. New writes cannot get here // because &w is ensuring that all new writes get queued. { - std::vector size_being_compacted; - if (!edit->IsColumnFamilyManipulation()) { - size_being_compacted.resize(v->storage_info()->num_levels() - 1); - // calculate the amount of data being compacted at every level - column_family_data->compaction_picker()->SizeBeingCompacted( - size_being_compacted); - } mu->Unlock(); @@ -1674,7 +1673,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (!edit->IsColumnFamilyManipulation()) { // This is cpu-heavy operations, which should be called outside mutex. - v->PrepareApply(mutable_cf_options, size_being_compacted); + v->PrepareApply(); } // Write new record to MANIFEST log @@ -2097,10 +2096,7 @@ Status VersionSet::Recover( builder->SaveTo(v->storage_info()); // Install recovered version - std::vector size_being_compacted( - v->storage_info()->num_levels() - 1); - cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted); - v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted); + v->PrepareApply(); AppendVersion(cfd, v); } @@ -2434,10 +2430,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, Version* v = new Version(cfd, this, current_version_number_++); builder->SaveTo(v->storage_info()); - std::vector size_being_compacted( - v->storage_info()->num_levels() - 1); - cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted); - v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted); + v->PrepareApply(); printf("--------------- Column family \"%s\" (ID %u) --------------\n", cfd->GetName().c_str(), (unsigned int)cfd->GetID()); diff --git a/db/version_set.h b/db/version_set.h index 6e645680b..83801e1da 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -113,13 +113,11 @@ class VersionStorageInfo { // Updates internal structures that keep track of compaction scores // We use compaction scores to figure out which compaction to do next - // REQUIRES: If Version is not yet saved to current_, it can be called without - // a lock. Once a version is saved to current_, call only with mutex held + // REQUIRES: db_mutex held!! // TODO find a better way to pass compaction_options_fifo. void ComputeCompactionScore( const MutableCFOptions& mutable_cf_options, - const CompactionOptionsFIFO& compaction_options_fifo, - std::vector& size_being_compacted); + const CompactionOptionsFIFO& compaction_options_fifo); // Generate level_files_brief_ from files_ void GenerateLevelFilesBrief(); @@ -365,10 +363,9 @@ class Version { Status* status, MergeContext* merge_context, bool* value_found = nullptr); - // Update scores, pre-calculated variables. It needs to be called before - // applying the version to the version set. - void PrepareApply(const MutableCFOptions& mutable_cf_options, - std::vector& size_being_compacted); + // Loads some stats information from files. Call without mutex held. It needs + // to be called before applying the version to the version set. + void PrepareApply(); // Reference count management (so Versions do not disappear out from // under live iterators)