diff --git a/db/db_impl.cc b/db/db_impl.cc index 07ac5c9d0..cffcbdfef 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1316,7 +1316,7 @@ int DBImpl::FindMinimumEmptyLevelFitting(int level) { int minimum_level = level; for (int i = level - 1; i > 0; --i) { // stop if level i is not empty - if (versions_->NumLevelFiles(i) > 0) break; + if (versions_->current()->NumLevelFiles(i) > 0) break; // stop if level i is too small (cannot fit the level files) if (versions_->MaxBytesForLevel(i) < versions_->NumLevelBytes(level)) break; @@ -2233,7 +2233,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, compact->compaction->Summary(scratch, sizeof(scratch)); Log(options_.info_log, "Compaction start summary: %s\n", scratch); - assert(versions_->NumLevelFiles(compact->compaction->level()) > 0); + assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0); assert(compact->builder == nullptr); assert(!compact->outfile); @@ -3207,7 +3207,7 @@ Status DBImpl::MakeRoomForWrite(bool force, { StopWatch sw(env_, options_.statistics.get(), STALL_L0_SLOWDOWN_COUNT); env_->SleepForMicroseconds( - SlowdownAmount(versions_->NumLevelFiles(0), + SlowdownAmount(versions_->current()->NumLevelFiles(0), options_.level0_slowdown_writes_trigger, options_.level0_stop_writes_trigger) ); @@ -3242,7 +3242,7 @@ Status DBImpl::MakeRoomForWrite(bool force, STALL_MEMTABLE_COMPACTION_MICROS, stall); stall_memtable_compaction_ += stall; stall_memtable_compaction_count_++; - } else if (versions_->NumLevelFiles(0) >= + } else if (versions_->current()->NumLevelFiles(0) >= options_.level0_stop_writes_trigger) { // There are too many level-0 files. DelayLoggingAndReset(); @@ -3372,6 +3372,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { value->clear(); MutexLock l(&mutex_); + Version* current = versions_->current(); Slice in = property; Slice prefix("rocksdb."); if (!in.starts_with(prefix)) return false; @@ -3386,7 +3387,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { } else { char buf[100]; snprintf(buf, sizeof(buf), "%d", - versions_->NumLevelFiles(static_cast(level))); + current->NumLevelFiles(static_cast(level))); *value = buf; return true; } @@ -3401,7 +3402,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { snprintf(buf, sizeof(buf), "%3d %8d %8.0f\n", level, - versions_->NumLevelFiles(level), + current->NumLevelFiles(level), versions_->NumLevelBytes(level) / 1048576.0); value->append(buf); } @@ -3446,7 +3447,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { ); value->append(buf); for (int level = 0; level < NumberLevels(); level++) { - int files = versions_->NumLevelFiles(level); + int files = current->NumLevelFiles(level); if (stats_[level].micros > 0 || files > 0) { int64_t bytes_read = stats_[level].bytes_readn + stats_[level].bytes_readnp1; @@ -3728,7 +3729,7 @@ Status DBImpl::DeleteFile(std::string name) { // This is to make sure that any deletion tombstones are not // lost. Check that the level passed is the last level. for (int i = level + 1; i < maxlevel; i++) { - if (versions_->NumLevelFiles(i) != 0) { + if (versions_->current()->NumLevelFiles(i) != 0) { Log(options_.info_log, "DeleteFile %s FAILED. File not in last level\n", name.c_str()); return Status::InvalidArgument("File not in last level"); @@ -3853,12 +3854,11 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { impl->MaybeScheduleLogDBDeployStats(); } } - impl->mutex_.Unlock(); - if (impl->options_.compaction_style == kCompactionStyleUniversal) { - int num_files; + if (s.ok() && impl->options_.compaction_style == kCompactionStyleUniversal) { + Version* current = impl->versions_->current(); for (int i = 1; i < impl->NumberLevels(); i++) { - num_files = impl->versions_->NumLevelFiles(i); + int num_files = current->NumLevelFiles(i); if (num_files > 0) { s = Status::InvalidArgument("Not all files are at level 0. Cannot " "open with universal compaction style."); @@ -3867,6 +3867,8 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { } } + impl->mutex_.Unlock(); + if (s.ok()) { *dbptr = impl; } else { diff --git a/db/db_stats_logger.cc b/db/db_stats_logger.cc index 91810abe3..0fd6dd805 100644 --- a/db/db_stats_logger.cc +++ b/db/db_stats_logger.cc @@ -65,8 +65,9 @@ void DBImpl::LogDBDeployStats() { uint64_t file_total_size = 0; uint32_t file_total_num = 0; - for (int i = 0; i < versions_->NumberLevels(); i++) { - file_total_num += versions_->NumLevelFiles(i); + Version* current = versions_->current(); + for (int i = 0; i < current->NumberLevels(); i++) { + file_total_num += current->NumLevelFiles(i); file_total_size += versions_->NumLevelBytes(i); } diff --git a/db/version_set.cc b/db/version_set.cc index a411ea210..b4c1b2233 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -45,7 +45,7 @@ Version::~Version() { next_->prev_ = prev_; // Drop references to files - for (int level = 0; level < vset_->NumberLevels(); level++) { + for (int level = 0; level < num_levels_; level++) { for (size_t i = 0; i < files_[level].size(); i++) { FileMetaData* f = files_[level][i]; assert(f->refs > 0); @@ -265,7 +265,7 @@ void Version::AddIterators(const ReadOptions& options, // For levels > 0, we can use a concatenating iterator that sequentially // walks through the non-overlapping files in the level, opening them // lazily. - for (int level = 1; level < vset_->NumberLevels(); level++) { + for (int level = 1; level < num_levels_; level++) { if (!files_[level].empty()) { iters->push_back(NewConcatenatingIterator(options, soptions, level)); } @@ -404,17 +404,19 @@ static bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { } Version::Version(VersionSet* vset, uint64_t version_number) - : vset_(vset), next_(this), prev_(this), refs_(0), - files_(new std::vector[vset->NumberLevels()]), - files_by_size_(vset->NumberLevels()), - next_file_to_compact_by_size_(vset->NumberLevels()), + : vset_(vset), + next_(this), + prev_(this), + refs_(0), + num_levels_(vset->num_levels_), + files_(new std::vector[num_levels_]), + files_by_size_(num_levels_), + next_file_to_compact_by_size_(num_levels_), file_to_compact_(nullptr), file_to_compact_level_(-1), - compaction_score_(vset->NumberLevels()), - compaction_level_(vset->NumberLevels()), - offset_manifest_file_(0), - version_number_(version_number) { -} + compaction_score_(num_levels_), + compaction_level_(num_levels_), + version_number_(version_number) {} void Version::Get(const ReadOptions& options, const LookupKey& k, @@ -453,7 +455,7 @@ void Version::Get(const ReadOptions& options, // levels. Therefore we are guaranteed that if we find data // in an smaller level, later levels are irrelevant (unless we // are MergeInProgress). - for (int level = 0; level < vset_->NumberLevels(); level++) { + for (int level = 0; level < num_levels_; level++) { size_t num_files = files_[level].size(); if (num_files == 0) continue; @@ -622,7 +624,7 @@ int Version::PickLevelForMemTableOutput( if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) { break; } - if (level + 2 >= vset_->NumberLevels()) { + if (level + 2 >= num_levels_) { level++; break; } @@ -857,7 +859,7 @@ bool Version::HasOverlappingUserKey( std::string Version::DebugString(bool hex) const { std::string r; - for (int level = 0; level < vset_->NumberLevels(); level++) { + for (int level = 0; level < num_levels_; level++) { // E.g., // --- level 1 --- // 17:123['a' .. 'd'] @@ -926,20 +928,18 @@ class VersionSet::Builder { public: // Initialize a builder with the files from *base and other info from *vset - Builder(VersionSet* vset, Version* base) - : vset_(vset), - base_(base) { + Builder(VersionSet* vset, Version* base) : vset_(vset), base_(base) { base_->Ref(); - levels_ = new LevelState[vset_->NumberLevels()]; + levels_ = new LevelState[base->NumberLevels()]; BySmallestKey cmp; cmp.internal_comparator = &vset_->icmp_; - for (int level = 0; level < vset_->NumberLevels(); level++) { + for (int level = 0; level < base->NumberLevels(); level++) { levels_[level].added_files = new FileSet(cmp); } } ~Builder() { - for (int level = 0; level < vset_->NumberLevels(); level++) { + for (int level = 0; level < base_->NumberLevels(); level++) { const FileSet* added = levels_[level].added_files; std::vector to_unref; to_unref.reserve(added->size()); @@ -962,7 +962,7 @@ class VersionSet::Builder { void CheckConsistency(Version* v) { #ifndef NDEBUG - for (int level = 0; level < vset_->NumberLevels(); level++) { + for (int level = 0; level < v->NumberLevels(); level++) { // Make sure there is no overlap in levels > 0 if (level > 0) { for (uint32_t i = 1; i < v->files_[level].size(); i++) { @@ -985,7 +985,7 @@ class VersionSet::Builder { #ifndef NDEBUG // a file to be deleted better exist in the previous version bool found = false; - for (int l = 0; !found && l < vset_->NumberLevels(); l++) { + for (int l = 0; !found && l < base_->NumberLevels(); l++) { const std::vector& base_files = base_->files_[l]; for (unsigned int i = 0; i < base_files.size(); i++) { FileMetaData* f = base_files[i]; @@ -998,7 +998,7 @@ class VersionSet::Builder { // if the file did not exist in the previous version, then it // is possibly moved from lower level to higher level in current // version - for (int l = level+1; !found && l < vset_->NumberLevels(); l++) { + for (int l = level+1; !found && l < base_->NumberLevels(); l++) { const FileSet* added = levels_[l].added_files; for (FileSet::const_iterator added_iter = added->begin(); added_iter != added->end(); ++added_iter) { @@ -1081,7 +1081,7 @@ class VersionSet::Builder { CheckConsistency(v); BySmallestKey cmp; cmp.internal_comparator = &vset_->icmp_; - for (int level = 0; level < vset_->NumberLevels(); level++) { + for (int level = 0; level < base_->NumberLevels(); level++) { // Merge the set of added files with the set of pre-existing files. // Drop any deleted files. Store the result in *v. const std::vector& base_files = base_->files_[level]; @@ -1128,8 +1128,7 @@ class VersionSet::Builder { } }; -VersionSet::VersionSet(const std::string& dbname, - const Options* options, +VersionSet::VersionSet(const std::string& dbname, const Options* options, const EnvOptions& storage_options, TableCache* table_cache, const InternalKeyComparator* cmp) @@ -1149,9 +1148,9 @@ VersionSet::VersionSet(const std::string& dbname, need_slowdown_for_num_level0_files(false), compactions_in_progress_(options_->num_levels), current_version_number_(0), - last_observed_manifest_size_(0), + manifest_file_size_(0), storage_options_(storage_options), - storage_options_compactions_(storage_options_) { + storage_options_compactions_(storage_options_) { compact_pointer_ = new std::string[options_->num_levels]; Init(options_->num_levels); AppendVersion(new Version(this, current_version_number_++)); @@ -1200,7 +1199,7 @@ void VersionSet::AppendVersion(Version* v) { current_ = v; need_slowdown_for_num_level0_files = (options_->level0_slowdown_writes_trigger >= 0 && current_ != nullptr && - NumLevelFiles(0) >= options_->level0_slowdown_writes_trigger); + v->NumLevelFiles(0) >= options_->level0_slowdown_writes_trigger); v->Ref(); // Append to linked list @@ -1250,7 +1249,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, // No need to perform this check if a new Manifest is being created anyways. if (!descriptor_log_ || - last_observed_manifest_size_ > options_->max_manifest_file_size) { + manifest_file_size_ > options_->max_manifest_file_size) { new_descriptor_log = true; manifest_file_number_ = NewFileNumber(); // Change manifest file no. } @@ -1264,7 +1263,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, // because &w is ensuring that all new writes get queued. { // calculate the amount of data being compacted at every level - std::vector size_being_compacted(NumberLevels()-1); + std::vector size_being_compacted(v->NumberLevels() - 1); SizeBeingCompacted(size_being_compacted); mu->Unlock(); @@ -1340,14 +1339,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, LogFlush(options_->info_log); mu->Lock(); - // cache the manifest_file_size so that it can be used to rollover in the - // next call to LogAndApply - last_observed_manifest_size_ = new_manifest_file_size; } // Install the new version if (s.ok()) { - v->offset_manifest_file_ = new_manifest_file_size; + manifest_file_size_ = new_manifest_file_size; AppendVersion(v); log_number_ = edit->log_number_; prev_log_number_ = edit->prev_log_number_; @@ -1459,7 +1455,7 @@ Status VersionSet::Recover() { break; } - if (edit.max_level_ >= NumberLevels()) { + if (edit.max_level_ >= current_->NumberLevels()) { s = Status::InvalidArgument( "db has more levels than options.num_levels"); break; @@ -1520,11 +1516,11 @@ Status VersionSet::Recover() { builder.SaveTo(v); // Install recovered version - std::vector size_being_compacted(NumberLevels()-1); + std::vector size_being_compacted(v->NumberLevels() - 1); SizeBeingCompacted(size_being_compacted); Finalize(v, size_being_compacted); - v->offset_manifest_file_ = manifest_file_size; + manifest_file_size_ = manifest_file_size; AppendVersion(v); manifest_file_number_ = next_file; next_file_number_ = next_file + 1; @@ -1548,7 +1544,7 @@ Status VersionSet::Recover() { } Status VersionSet::DumpManifest(Options& options, std::string& dscname, - bool verbose, bool hex) { + bool verbose, bool hex) { struct LogReporter : public log::Reader::Reporter { Status* status; virtual void Corruption(size_t bytes, const Status& s) { @@ -1652,7 +1648,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, builder.SaveTo(v); // Install recovered version - std::vector size_being_compacted(NumberLevels()-1); + std::vector size_being_compacted(v->NumberLevels() - 1); SizeBeingCompacted(size_being_compacted); Finalize(v, size_being_compacted); @@ -1683,7 +1679,7 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { } void VersionSet::Finalize(Version* v, - std::vector& size_being_compacted) { + std::vector& size_being_compacted) { // Pre-sort level0 for Get() if (options_->compaction_style == kCompactionStyleUniversal) { std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirstBySeqNo); @@ -1696,7 +1692,7 @@ void VersionSet::Finalize(Version* v, int num_levels_to_check = (options_->compaction_style != kCompactionStyleUniversal) ? - NumberLevels() - 1 : 1; + v->NumberLevels() - 1 : 1; for (int level = 0; level < num_levels_to_check; level++) { @@ -1757,8 +1753,8 @@ void VersionSet::Finalize(Version* v, // sort all the levels based on their score. Higher scores get listed // first. Use bubble sort because the number of entries are small. - for (int i = 0; i < NumberLevels()-2; i++) { - for (int j = i+1; j < NumberLevels()-1; j++) { + for (int i = 0; i < v->NumberLevels() - 2; i++) { + for (int j = i + 1; j < v->NumberLevels() - 1; j++) { if (v->compaction_score_[i] < v->compaction_score_[j]) { double score = v->compaction_score_[i]; int level = v->compaction_level_[i]; @@ -1793,8 +1789,9 @@ static bool compareSeqnoDescending(const VersionSet::Fsize& first, void VersionSet::UpdateFilesBySize(Version* v) { // No need to sort the highest level because it is never compacted. - int max_level = (options_->compaction_style == kCompactionStyleUniversal) ? - NumberLevels() : NumberLevels() - 1; + int max_level = (options_->compaction_style == kCompactionStyleUniversal) + ? v->NumberLevels() + : v->NumberLevels() - 1; for (int level = 0; level < max_level; level++) { @@ -1850,7 +1847,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { } // Save files - for (int level = 0; level < NumberLevels(); level++) { + for (int level = 0; level < current_->NumberLevels(); level++) { const std::vector& files = current_->files_[level]; for (size_t i = 0; i < files.size(); i++) { const FileMetaData* f = files[i]; @@ -1864,15 +1861,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { return log->AddRecord(record); } -int VersionSet::NumLevelFiles(int level) const { - assert(level >= 0); - assert(level < NumberLevels()); - return current_->files_[level].size(); -} - const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const { int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files["); - for (int i = 0; i < NumberLevels(); i++) { + for (int i = 0; i < current_->NumberLevels(); i++) { int sz = sizeof(scratch->buffer) - len; int ret = snprintf(scratch->buffer + len, sz, "%d ", int(current_->files_[i].size())); @@ -1884,10 +1875,10 @@ const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const { return scratch->buffer; } -const char* VersionSet::LevelDataSizeSummary( - LevelSummaryStorage* scratch) const { +const char* VersionSet::LevelDataSizeSummary(LevelSummaryStorage* scratch) + const { int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size["); - for (int i = 0; i < NumberLevels(); i++) { + for (int i = 0; i < current_->NumberLevels(); i++) { int sz = sizeof(scratch->buffer) - len; int ret = snprintf(scratch->buffer + len, sz, "%lu ", (unsigned long)NumLevelBytes(i)); @@ -1950,7 +1941,7 @@ bool VersionSet::ManifestContains(const std::string& record) const { uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { uint64_t result = 0; - for (int level = 0; level < NumberLevels(); level++) { + for (int level = 0; level < v->NumberLevels(); level++) { const std::vector& files = v->files_[level]; for (size_t i = 0; i < files.size(); i++) { if (icmp_.Compare(files[i]->largest, ikey) <= 0) { @@ -1987,7 +1978,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { for (Version* v = dummy_versions_.next_; v != &dummy_versions_; v = v->next_) { - for (int level = 0; level < NumberLevels(); level++) { + for (int level = 0; level < v->NumberLevels(); level++) { total_files += v->files_[level].size(); } } @@ -1998,7 +1989,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { for (Version* v = dummy_versions_.next_; v != &dummy_versions_; v = v->next_) { - for (int level = 0; level < NumberLevels(); level++) { + for (int level = 0; level < v->NumberLevels(); level++) { for (const auto& f : v->files_[level]) { live_list->push_back(f->number); } @@ -2008,7 +1999,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { void VersionSet::AddLiveFilesCurrentVersion(std::set* live) { Version* v = current_; - for (int level = 0; level < NumberLevels(); level++) { + for (int level = 0; level < v->NumberLevels(); level++) { const std::vector& files = v->files_[level]; for (size_t i = 0; i < files.size(); i++) { live->insert(files[i]->number); @@ -2018,7 +2009,7 @@ void VersionSet::AddLiveFilesCurrentVersion(std::set* live) { int64_t VersionSet::NumLevelBytes(int level) const { assert(level >= 0); - assert(level < NumberLevels()); + assert(level < current_->NumberLevels()); assert(current_); return TotalFileSize(current_->files_[level]); } @@ -2026,7 +2017,7 @@ int64_t VersionSet::NumLevelBytes(int level) const { int64_t VersionSet::MaxNextLevelOverlappingBytes() { uint64_t result = 0; std::vector overlaps; - for (int level = 1; level < NumberLevels() - 1; level++) { + for (int level = 1; level < current_->NumberLevels() - 1; level++) { for (size_t i = 0; i < current_->files_[level].size(); i++) { const FileMetaData* f = current_->files_[level][i]; current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest, @@ -2200,7 +2191,7 @@ void VersionSet::ReleaseCompactionFiles(Compaction* c, Status status) { // The total size of files that are currently being compacted // at at every level upto the penultimate level. void VersionSet::SizeBeingCompacted(std::vector& sizes) { - for (int level = 0; level < NumberLevels()-1; level++) { + for (int level = 0; level < NumberLevels() - 1; level++) { uint64_t total = 0; for (std::set::iterator it = compactions_in_progress_[level].begin(); @@ -2223,8 +2214,8 @@ void VersionSet::SizeBeingCompacted(std::vector& sizes) { // base file (overrides configured values of file-size ratios, // min_merge_width and max_merge_width). // -Compaction* VersionSet::PickCompactionUniversalSizeAmp( - int level, double score) { +Compaction* VersionSet::PickCompactionUniversalSizeAmp(int level, + double score) { assert (level == 0); // percentage flexibilty while reducing size amplification @@ -2306,13 +2297,13 @@ Compaction* VersionSet::PickCompactionUniversalSizeAmp( // create a compaction request // We always compact all the files, so always compress. - Compaction* c = new Compaction(level, level, MaxFileSizeForLevel(level), - LLONG_MAX, NumberLevels(), false, - true); + Compaction* c = + new Compaction(current_, level, level, MaxFileSizeForLevel(level), + LLONG_MAX, false, true); c->score_ = score; for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) { int index = file_by_time[loop]; - f = current_->files_[level][index]; + f = c->input_version_->files_[level][index]; c->inputs_[0].push_back(f); Log(options_->info_log, "Universal: size amp picking file %lu[%d] with size %lu", @@ -2436,14 +2427,14 @@ Compaction* VersionSet::PickCompactionUniversalReadAmp( } } } - Compaction* c = new Compaction(level, level, MaxFileSizeForLevel(level), - LLONG_MAX, NumberLevels(), false, - enable_compression); + Compaction* c = + new Compaction(current_, level, level, MaxFileSizeForLevel(level), + LLONG_MAX, false, enable_compression); c->score_ = score; for (unsigned int i = start_index; i < first_index_after; i++) { int index = file_by_time[i]; - FileMetaData* f = current_->files_[level][index]; + FileMetaData* f = c->input_version_->files_[level][index]; c->inputs_[0].push_back(f); Log(options_->info_log, "Universal: Picking file %lu[%d] with size %lu\n", (unsigned long)f->number, @@ -2505,11 +2496,11 @@ Compaction* VersionSet::PickCompactionUniversal(int level, double score) { } // The files are sorted from newest first to oldest last. - std::vector& file_by_time = current_->files_by_size_[level]; + std::vector& file_by_time = c->input_version_->files_by_size_[level]; // Is the earliest file part of this compaction? int last_index = file_by_time[file_by_time.size()-1]; - FileMetaData* last_file = current_->files_[level][last_index]; + FileMetaData* last_file = c->input_version_->files_[level][last_index]; if (c->inputs_[0][c->inputs_[0].size()-1] == last_file) { c->bottommost_level_ = true; } @@ -2520,9 +2511,6 @@ Compaction* VersionSet::PickCompactionUniversal(int level, double score) { c->inputs_[0].size()); } - c->input_version_ = current_; - c->input_version_->Ref(); - // mark all the files that are being compacted c->MarkFilesBeingCompacted(true); @@ -2531,7 +2519,8 @@ Compaction* VersionSet::PickCompactionUniversal(int level, double score) { // Record whether this compaction includes all sst files. // For now, it is only relevant in universal compaction mode. - c->is_full_compaction_ = (c->inputs_[0].size() == current_->files_[0].size()); + c->is_full_compaction_ = + (c->inputs_[0].size() == c->input_version_->files_[0].size()); return c; } @@ -2548,27 +2537,28 @@ Compaction* VersionSet::PickCompactionBySize(int level, double score) { } assert(level >= 0); - assert(level+1 < NumberLevels()); - c = new Compaction(level, level+1, MaxFileSizeForLevel(level+1), - MaxGrandParentOverlapBytes(level), NumberLevels()); + assert(level + 1 < current_->NumberLevels()); + c = new Compaction(current_, level, level + 1, MaxFileSizeForLevel(level + 1), + MaxGrandParentOverlapBytes(level)); c->score_ = score; // Pick the largest file in this level that is not already // being compacted - std::vector& file_size = current_->files_by_size_[level]; + std::vector& file_size = c->input_version_->files_by_size_[level]; // record the first file that is not yet compacted int nextIndex = -1; - for (unsigned int i = current_->next_file_to_compact_by_size_[level]; + for (unsigned int i = c->input_version_->next_file_to_compact_by_size_[level]; i < file_size.size(); i++) { int index = file_size[i]; - FileMetaData* f = current_->files_[level][index]; + FileMetaData* f = c->input_version_->files_[level][index]; // check to verify files are arranged in descending size assert((i == file_size.size() - 1) || - (i >= Version::number_of_files_to_sort_-1) || - (f->file_size >= current_->files_[level][file_size[i+1]]->file_size)); + (i >= Version::number_of_files_to_sort_ - 1) || + (f->file_size >= + c->input_version_->files_[level][file_size[i + 1]]->file_size)); // do not pick a file to compact if it is being compacted // from n-1 level. @@ -2604,7 +2594,7 @@ Compaction* VersionSet::PickCompactionBySize(int level, double score) { } // store where to start the iteration in the next call to PickCompaction - current_->next_file_to_compact_by_size_[level] = nextIndex; + c->input_version_->next_file_to_compact_by_size_[level] = nextIndex; return c; } @@ -2655,11 +2645,12 @@ Compaction* VersionSet::PickCompaction() { if (level != 0 || compactions_in_progress_[0].empty()) { if(!ParentRangeInCompaction(&f->smallest, &f->largest, level, &parent_index)) { - c = new Compaction(level, level+1, MaxFileSizeForLevel(level+1), - MaxGrandParentOverlapBytes(level), NumberLevels(), true); + c = new Compaction(current_, level, level + 1, + MaxFileSizeForLevel(level + 1), + MaxGrandParentOverlapBytes(level), true); c->inputs_[0].push_back(f); c->parent_index_ = parent_index; - current_->file_to_compact_ = nullptr; + c->input_version_->file_to_compact_ = nullptr; ExpandWhileOverlapping(c); } } @@ -2669,9 +2660,6 @@ Compaction* VersionSet::PickCompaction() { return nullptr; } - c->input_version_ = current_; - c->input_version_->Ref(); - // Two level 0 compaction won't run at the same time, so don't need to worry // about files on level 0 being compacted. if (level == 0) { @@ -2682,7 +2670,8 @@ Compaction* VersionSet::PickCompaction() { // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. c->inputs_[0].clear(); - current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]); + c->input_version_->GetOverlappingInputs(0, &smallest, &largest, + &c->inputs_[0]); // If we include more L0 files in the same compaction run it can // cause the 'smallest' and 'largest' key to get extended to a @@ -2713,12 +2702,13 @@ Compaction* VersionSet::PickCompaction() { // Returns true if any one of the parent files are being compacted bool VersionSet::ParentRangeInCompaction(const InternalKey* smallest, - const InternalKey* largest, int level, int* parent_index) { + const InternalKey* largest, int level, + int* parent_index) { std::vector inputs; - assert(level + 1 < NumberLevels()); + assert(level + 1 < current_->NumberLevels()); - current_->GetOverlappingInputs(level+1, smallest, largest, - &inputs, *parent_index, parent_index); + current_->GetOverlappingInputs(level + 1, smallest, largest, &inputs, + *parent_index, parent_index); return FilesInCompaction(inputs); } @@ -2766,8 +2756,8 @@ void VersionSet::ExpandWhileOverlapping(Compaction* c) { old_size = c->inputs_[0].size(); GetRange(c->inputs_[0], &smallest, &largest); c->inputs_[0].clear(); - current_->GetOverlappingInputs(level, &smallest, &largest, &c->inputs_[0], - hint_index, &hint_index); + c->input_version_->GetOverlappingInputs( + level, &smallest, &largest, &c->inputs_[0], hint_index, &hint_index); } while(c->inputs_[0].size() > old_size); // Get the new range @@ -2805,8 +2795,9 @@ void VersionSet::SetupOtherInputs(Compaction* c) { GetRange(c->inputs_[0], &smallest, &largest); // Populate the set of next-level files (inputs_[1]) to include in compaction - current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1], - c->parent_index_, &c->parent_index_); + c->input_version_->GetOverlappingInputs(level + 1, &smallest, &largest, + &c->inputs_[1], c->parent_index_, + &c->parent_index_); // Get entire range covered by compaction InternalKey all_start, all_limit; @@ -2819,8 +2810,8 @@ void VersionSet::SetupOtherInputs(Compaction* c) { // can happen when one user key spans multiple files. if (!c->inputs_[1].empty()) { std::vector expanded0; - current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0, - c->base_index_, nullptr); + c->input_version_->GetOverlappingInputs( + level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr); const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]); const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]); const uint64_t expanded0_size = TotalFileSize(expanded0); @@ -2828,13 +2819,13 @@ void VersionSet::SetupOtherInputs(Compaction* c) { if (expanded0.size() > c->inputs_[0].size() && inputs1_size + expanded0_size < limit && !FilesInCompaction(expanded0) && - !current_->HasOverlappingUserKey(&expanded0, level)) { + !c->input_version_->HasOverlappingUserKey(&expanded0, level)) { InternalKey new_start, new_limit; GetRange(expanded0, &new_start, &new_limit); std::vector expanded1; - current_->GetOverlappingInputs(level+1, &new_start, &new_limit, - &expanded1, c->parent_index_, - &c->parent_index_); + c->input_version_->GetOverlappingInputs(level + 1, &new_start, &new_limit, + &expanded1, c->parent_index_, + &c->parent_index_); if (expanded1.size() == c->inputs_[1].size() && !FilesInCompaction(expanded1)) { Log(options_->info_log, @@ -2861,8 +2852,8 @@ void VersionSet::SetupOtherInputs(Compaction* c) { // Compute the set of grandparent files that overlap this compaction // (parent == level+1; grandparent == level+2) if (level + 2 < NumberLevels()) { - current_->GetOverlappingInputs(level + 2, &all_start, &all_limit, - &c->grandparents_); + c->input_version_->GetOverlappingInputs(level + 2, &all_start, &all_limit, + &c->grandparents_); } if (false) { @@ -2880,10 +2871,8 @@ void VersionSet::SetupOtherInputs(Compaction* c) { c->edit_->SetCompactPointer(level, largest); } -Status VersionSet::GetMetadataForFile( - uint64_t number, - int *filelevel, - FileMetaData *meta) { +Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, + FileMetaData* meta) { for (int level = 0; level < NumberLevels(); level++) { const std::vector& files = current_->files_[level]; for (size_t i = 0; i < files.size(); i++) { @@ -2897,8 +2886,7 @@ Status VersionSet::GetMetadataForFile( return Status::NotFound("File not present in any level"); } -void VersionSet::GetLiveFilesMetaData( - std::vector * metadata) { +void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { for (int level = 0; level < NumberLevels(); level++) { const std::vector& files = current_->files_[level]; for (size_t i = 0; i < files.size(); i++) { @@ -2960,11 +2948,9 @@ Compaction* VersionSet::CompactRange(int input_level, } } } - Compaction* c = new Compaction(input_level, - output_level, + Compaction* c = new Compaction(current_, input_level, output_level, MaxFileSizeForLevel(output_level), - MaxGrandParentOverlapBytes(input_level), - NumberLevels()); + MaxGrandParentOverlapBytes(input_level)); c->inputs_[0] = inputs; ExpandWhileOverlapping(c); @@ -2973,8 +2959,6 @@ Compaction* VersionSet::CompactRange(int input_level, return nullptr; } - c->input_version_ = current_; - c->input_version_->Ref(); SetupOtherInputs(c); if (covering_the_whole_range) { @@ -2991,15 +2975,16 @@ Compaction* VersionSet::CompactRange(int input_level, return c; } -Compaction::Compaction(int level, int out_level, uint64_t target_file_size, - uint64_t max_grandparent_overlap_bytes, int number_levels, - bool seek_compaction, bool enable_compression) +Compaction::Compaction(Version* input_version, int level, int out_level, + uint64_t target_file_size, + uint64_t max_grandparent_overlap_bytes, + bool seek_compaction, bool enable_compression) : level_(level), out_level_(out_level), max_output_file_size_(target_file_size), maxGrandParentOverlapBytes_(max_grandparent_overlap_bytes), - input_version_(nullptr), - number_levels_(number_levels), + input_version_(input_version), + number_levels_(input_version_->NumberLevels()), seek_compaction_(seek_compaction), enable_compression_(enable_compression), grandparent_index_(0), @@ -3010,7 +2995,9 @@ Compaction::Compaction(int level, int out_level, uint64_t target_file_size, score_(0), bottommost_level_(false), is_full_compaction_(false), - level_ptrs_(std::vector(number_levels)) { + level_ptrs_(std::vector(number_levels_)) { + + input_version_->Ref(); edit_ = new VersionEdit(); for (int i = 0; i < number_levels_; i++) { level_ptrs_[i] = 0; @@ -3125,7 +3112,7 @@ void Compaction::SetupBottomMostLevel(bool isManual) { bottommost_level_ = true; int num_levels = input_version_->vset_->NumberLevels(); for (int i = output_level() + 1; i < num_levels; i++) { - if (input_version_->vset_->NumLevelFiles(i) > 0) { + if (input_version_->NumLevelFiles(i) > 0) { bottommost_level_ = false; break; } @@ -3143,9 +3130,8 @@ void Compaction::ResetNextCompactionIndex() { input_version_->ResetNextCompactionIndex(level_); } -static void InputSummary(std::vector& files, - char* output, - int len) { +static void InputSummary(std::vector& files, char* output, + int len) { int write = 0; for (unsigned int i = 0; i < files.size(); i++) { int sz = len - write; diff --git a/db/version_set.h b/db/version_set.h index 2c91532b5..68c41b160 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -135,7 +135,10 @@ class Version { int PickLevelForMemTableOutput(const Slice& smallest_user_key, const Slice& largest_user_key); - int NumFiles(int level) const { return files_[level].size(); } + int NumberLevels() const { return num_levels_; } + + // REQUIRES: lock is held + int NumLevelFiles(int level) const { return files_[level].size(); } // Return a human readable string that describes this version's contents. std::string DebugString(bool hex = false) const; @@ -161,6 +164,7 @@ class Version { Version* next_; // Next version in linked list Version* prev_; // Previous version in linked list int refs_; // Number of live refs to this version + int num_levels_; // Number of levels // List of files per level, files in each level are arranged // in increasing order of keys @@ -197,9 +201,6 @@ class Version { double max_compaction_score_; // max score in l1 to ln-1 int max_compaction_score_level_; // level on which max score occurs - // The offset in the manifest file where this version is stored. - uint64_t offset_manifest_file_; - // A version number that uniquely represents this version. This is // used for debugging and logging purposes only. uint64_t version_number_; @@ -234,7 +235,7 @@ class VersionSet { // REQUIRES: *mu is held on entry. // REQUIRES: no other thread concurrently calls LogAndApply() Status LogAndApply(VersionEdit* edit, port::Mutex* mu, - bool new_descriptor_log = false); + bool new_descriptor_log = false); // Recover the last saved descriptor from persistent storage. Status Recover(); @@ -271,9 +272,6 @@ class VersionSet { } } - // Return the number of Table files at the specified level. - int NumLevelFiles(int level) const; - // Return the combined file size of all files at the specified level. int64_t NumLevelBytes(int level) const; @@ -400,7 +398,7 @@ class VersionSet { const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const; // Return the size of the current manifest file - const uint64_t ManifestFileSize() { return current_->offset_manifest_file_; } + uint64_t ManifestFileSize() const { return manifest_file_size_; } // For the specfied level, pick a compaction. // Returns nullptr if there is no compaction to be done. @@ -524,9 +522,8 @@ class VersionSet { // Queue of writers to the manifest file std::deque manifest_writers_; - // Store the manifest file size when it is checked. - // Save us the cost of checking file size twice in LogAndApply - uint64_t last_observed_manifest_size_; + // Current size of manifest file + uint64_t manifest_file_size_; std::vector obsolete_files_; @@ -619,9 +616,9 @@ class Compaction { friend class Version; friend class VersionSet; - explicit Compaction(int level, int out_level, uint64_t target_file_size, - uint64_t max_grandparent_overlap_bytes, int number_levels, - bool seek_compaction = false, bool enable_compression = true); + Compaction(Version* input_version, int level, int out_level, + uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, + bool seek_compaction = false, bool enable_compression = true); int level_; int out_level_; // levels to which output files are stored diff --git a/db/version_set_reduce_num_levels.cc b/db/version_set_reduce_num_levels.cc index 07062399b..2ca689809 100644 --- a/db/version_set_reduce_num_levels.cc +++ b/db/version_set_reduce_num_levels.cc @@ -25,7 +25,7 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) { } Version* current_version = current_; - int current_levels = NumberLevels(); + int current_levels = current_version->NumberLevels(); if (current_levels <= new_levels) { return Status::OK(); @@ -36,7 +36,7 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) { int first_nonempty_level = -1; int first_nonempty_level_filenum = 0; for (int i = new_levels - 1; i < current_levels; i++) { - int file_num = NumLevelFiles(i); + int file_num = current_version->NumLevelFiles(i); if (file_num != 0) { if (first_nonempty_level < 0) { first_nonempty_level = i; @@ -65,6 +65,7 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) { delete[] current_version->files_; current_version->files_ = new_files_list; + current_version->num_levels_ = new_levels; delete[] compact_pointer_; delete[] max_file_size_; diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 58d81460e..65ecd61a2 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -1024,7 +1024,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, } int max = -1; for (int i = 0; i < versions.NumberLevels(); i++) { - if (versions.NumLevelFiles(i)) { + if (versions.current()->NumLevelFiles(i)) { max = i; } }