diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index a7232246a..04d6d0e17 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -74,7 +74,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, // Make a set of all of the live *.sst files std::set live; - versions_->AddLiveFilesCurrentVersion(&live); + versions_->current()->AddLiveFiles(&live); ret.clear(); ret.reserve(live.size() + 2); //*.sst + CURRENT + MANIFEST diff --git a/db/db_impl.cc b/db/db_impl.cc index cffcbdfef..e84817b9b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1313,13 +1313,13 @@ void DBImpl::CompactRange(const Slice* begin, // return the same level if it cannot be moved int DBImpl::FindMinimumEmptyLevelFitting(int level) { mutex_.AssertHeld(); + Version* current = versions_->current(); int minimum_level = level; for (int i = level - 1; i > 0; --i) { // stop if level i is not empty - if (versions_->current()->NumLevelFiles(i) > 0) break; - + if (current->NumLevelFiles(i) > 0) break; // stop if level i is too small (cannot fit the level files) - if (versions_->MaxBytesForLevel(i) < versions_->NumLevelBytes(level)) break; + if (versions_->MaxBytesForLevel(i) < current->NumLevelBytes(level)) break; minimum_level = i; } @@ -1826,6 +1826,11 @@ void DBImpl::TEST_PurgeObsoleteteWAL() { PurgeObsoleteWALFiles(); } +uint64_t DBImpl::TEST_GetLevel0TotalSize() { + MutexLock l(&mutex_); + return versions_->current()->NumLevelBytes(0); +} + void DBImpl::BackgroundCallCompaction() { bool madeProgress = false; DeletionState deletion_state(options_.max_write_buffer_number, true); @@ -1939,13 +1944,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, f->smallest_seqno, f->largest_seqno); status = versions_->LogAndApply(c->edit(), &mutex_); InstallSuperVersion(deletion_state); - VersionSet::LevelSummaryStorage tmp; + Version::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", - static_cast(f->number), - c->level() + 1, + static_cast(f->number), c->level() + 1, static_cast(f->file_size), - status.ToString().c_str(), - versions_->LevelSummary(&tmp)); + status.ToString().c_str(), versions_->current()->LevelSummary(&tmp)); versions_->ReleaseCompactionFiles(c.get(), status); *madeProgress = true; } else { @@ -2605,22 +2608,21 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, status = InstallCompactionResults(compact); InstallSuperVersion(deletion_state); } - VersionSet::LevelSummaryStorage tmp; + Version::LevelSummaryStorage tmp; Log(options_.info_log, "compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) " "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " "write-amplify(%.1f) %s\n", - versions_->LevelSummary(&tmp), + versions_->current()->LevelSummary(&tmp), (stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) / - (double) stats.micros, - compact->compaction->output_level(), - stats.files_in_leveln, stats.files_in_levelnp1, stats.files_out_levelnp1, - stats.bytes_readn / 1048576.0, - stats.bytes_readnp1 / 1048576.0, + (double)stats.micros, + compact->compaction->output_level(), stats.files_in_leveln, + stats.files_in_levelnp1, stats.files_out_levelnp1, + stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0, stats.bytes_written / 1048576.0, (stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) / - (double) stats.bytes_readn, - stats.bytes_written / (double) stats.bytes_readn, + (double)stats.bytes_readn, + stats.bytes_written / (double)stats.bytes_readn, status.ToString().c_str()); return status; @@ -2701,7 +2703,7 @@ Iterator* DBImpl::TEST_NewInternalIterator() { int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { MutexLock l(&mutex_); - return versions_->MaxNextLevelOverlappingBytes(); + return versions_->current()->MaxNextLevelOverlappingBytes(); } Status DBImpl::Get(const ReadOptions& options, @@ -3193,9 +3195,7 @@ Status DBImpl::MakeRoomForWrite(bool force, // Yield previous error s = bg_error_; break; - } else if ( - allow_delay && - versions_->NeedSlowdownForNumLevel0Files()) { + } else if (allow_delay && versions_->NeedSlowdownForNumLevel0Files()) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several // seconds when we hit the hard limit, start delaying each @@ -3403,7 +3403,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { "%3d %8d %8.0f\n", level, current->NumLevelFiles(level), - versions_->NumLevelBytes(level) / 1048576.0); + current->NumLevelBytes(level) / 1048576.0); value->append(buf); } return true; @@ -3446,7 +3446,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { "--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" ); value->append(buf); - for (int level = 0; level < NumberLevels(); level++) { + for (int level = 0; level < current->NumberLevels(); level++) { int files = current->NumLevelFiles(level); if (stats_[level].micros > 0 || files > 0) { int64_t bytes_read = stats_[level].bytes_readn + @@ -3468,8 +3468,8 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f %9lu\n", level, files, - versions_->NumLevelBytes(level) / 1048576.0, - versions_->NumLevelBytes(level) / + current->NumLevelBytes(level) / 1048576.0, + current->NumLevelBytes(level) / versions_->MaxBytesForLevel(level), stats_[level].micros / 1e6, bytes_read / 1048576.0, diff --git a/db/db_impl.h b/db/db_impl.h index 476b2bf54..214affac7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -129,7 +129,7 @@ class DBImpl : public DB { void TEST_PurgeObsoleteteWAL(); // get total level0 file size. Only for testing. - uint64_t TEST_GetLevel0TotalSize() { return versions_->NumLevelBytes(0);} + uint64_t TEST_GetLevel0TotalSize(); void TEST_SetDefaultTimeToCheck(uint64_t default_interval_to_delete_obsolete_WAL) { diff --git a/db/db_stats_logger.cc b/db/db_stats_logger.cc index 0fd6dd805..db86865ca 100644 --- a/db/db_stats_logger.cc +++ b/db/db_stats_logger.cc @@ -68,11 +68,11 @@ void DBImpl::LogDBDeployStats() { Version* current = versions_->current(); for (int i = 0; i < current->NumberLevels(); i++) { file_total_num += current->NumLevelFiles(i); - file_total_size += versions_->NumLevelBytes(i); + file_total_size += current->NumLevelBytes(i); } - VersionSet::LevelSummaryStorage scratch; - const char* file_num_summary = versions_->LevelSummary(&scratch); + Version::LevelSummaryStorage scratch; + const char* file_num_summary = current->LevelSummary(&scratch); std::string file_num_per_level(file_num_summary); std::string data_size_per_level(file_num_summary); diff --git a/db/version_set.cc b/db/version_set.cc index b4c1b2233..eb20650ba 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -857,6 +857,67 @@ bool Version::HasOverlappingUserKey( return false; } +int64_t Version::NumLevelBytes(int level) const { + assert(level >= 0); + assert(level < NumberLevels()); + return TotalFileSize(files_[level]); +} + +const char* Version::LevelSummary(LevelSummaryStorage* scratch) const { + int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files["); + for (int i = 0; i < NumberLevels(); i++) { + int sz = sizeof(scratch->buffer) - len; + int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size())); + if (ret < 0 || ret >= sz) break; + len += ret; + } + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]"); + return scratch->buffer; +} + +const char* Version::LevelFileSummary(FileSummaryStorage* scratch, + int level) const { + int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size["); + for (const auto& f : files_[level]) { + int sz = sizeof(scratch->buffer) - len; + int ret = snprintf(scratch->buffer + len, sz, + "#%lu(seq=%lu,sz=%lu,%lu) ", + (unsigned long)f->number, + (unsigned long)f->smallest_seqno, + (unsigned long)f->file_size, + (unsigned long)f->being_compacted); + if (ret < 0 || ret >= sz) + break; + len += ret; + } + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]"); + return scratch->buffer; +} + +int64_t Version::MaxNextLevelOverlappingBytes() { + uint64_t result = 0; + std::vector overlaps; + for (int level = 1; level < NumberLevels() - 1; level++) { + for (const auto& f : files_[level]) { + GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps); + const uint64_t sum = TotalFileSize(overlaps); + if (sum > result) { + result = sum; + } + } + } + return result; +} + +void Version::AddLiveFiles(std::set* live) { + for (int level = 0; level < NumberLevels(); level++) { + const std::vector& files = files_[level]; + for (const auto& file : files) { + live->insert(file->number); + } + } +} + std::string Version::DebugString(bool hex) const { std::string r; for (int level = 0; level < num_levels_; level++) { @@ -1145,7 +1206,7 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options, num_levels_(options_->num_levels), dummy_versions_(this), current_(nullptr), - need_slowdown_for_num_level0_files(false), + need_slowdown_for_num_level0_files_(false), compactions_in_progress_(options_->num_levels), current_version_number_(0), manifest_file_size_(0), @@ -1197,7 +1258,7 @@ void VersionSet::AppendVersion(Version* v) { current_->Unref(); } current_ = v; - need_slowdown_for_num_level0_files = + need_slowdown_for_num_level0_files_ = (options_->level0_slowdown_writes_trigger >= 0 && current_ != nullptr && v->NumLevelFiles(0) >= options_->level0_slowdown_writes_trigger); v->Ref(); @@ -1861,55 +1922,6 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { return log->AddRecord(record); } -const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const { - int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files["); - for (int i = 0; i < current_->NumberLevels(); i++) { - int sz = sizeof(scratch->buffer) - len; - int ret = snprintf(scratch->buffer + len, sz, "%d ", - int(current_->files_[i].size())); - if (ret < 0 || ret >= sz) - break; - len += ret; - } - snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]"); - return scratch->buffer; -} - -const char* VersionSet::LevelDataSizeSummary(LevelSummaryStorage* scratch) - const { - int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size["); - for (int i = 0; i < current_->NumberLevels(); i++) { - int sz = sizeof(scratch->buffer) - len; - int ret = snprintf(scratch->buffer + len, sz, "%lu ", - (unsigned long)NumLevelBytes(i)); - if (ret < 0 || ret >= sz) - break; - len += ret; - } - snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]"); - return scratch->buffer; -} - -const char* VersionSet::LevelFileSummary( - FileSummaryStorage* scratch, int level) const { - int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size["); - for (unsigned int i = 0; i < current_->files_[level].size(); i++) { - FileMetaData* f = current_->files_[level][i]; - int sz = sizeof(scratch->buffer) - len; - int ret = snprintf(scratch->buffer + len, sz, - "#%lu(seq=%lu,sz=%lu,%lu) ", - (unsigned long)f->number, - (unsigned long)f->smallest_seqno, - (unsigned long)f->file_size, - (unsigned long)f->being_compacted); - if (ret < 0 || ret >= sz) - break; - len += ret; - } - snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]"); - return scratch->buffer; -} - // Opens the mainfest file and reads all records // till it finds the record we are looking for. bool VersionSet::ManifestContains(const std::string& record) const { @@ -1997,40 +2009,6 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { } } -void VersionSet::AddLiveFilesCurrentVersion(std::set* live) { - Version* v = current_; - for (int level = 0; level < v->NumberLevels(); level++) { - const std::vector& files = v->files_[level]; - for (size_t i = 0; i < files.size(); i++) { - live->insert(files[i]->number); - } - } -} - -int64_t VersionSet::NumLevelBytes(int level) const { - assert(level >= 0); - assert(level < current_->NumberLevels()); - assert(current_); - return TotalFileSize(current_->files_[level]); -} - -int64_t VersionSet::MaxNextLevelOverlappingBytes() { - uint64_t result = 0; - std::vector overlaps; - for (int level = 1; level < current_->NumberLevels() - 1; level++) { - for (size_t i = 0; i < current_->files_[level].size(); i++) { - const FileMetaData* f = current_->files_[level][i]; - current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest, - &overlaps); - const uint64_t sum = TotalFileSize(overlaps); - if (sum > result) { - result = sum; - } - } - } - return result; -} - // Stores the minimal range that covers all entries in inputs in // *smallest, *largest. // REQUIRES: inputs is not empty @@ -2456,10 +2434,10 @@ Compaction* VersionSet::PickCompactionUniversal(int level, double score) { Log(options_->info_log, "Universal: nothing to do\n"); return nullptr; } - VersionSet::FileSummaryStorage tmp; + Version::FileSummaryStorage tmp; Log(options_->info_log, "Universal: candidate files(%lu): %s\n", current_->files_[level].size(), - LevelFileSummary(&tmp, 0)); + current_->LevelFileSummary(&tmp, 0)); // Check for size amplification first. Compaction* c = PickCompactionUniversalSizeAmp(level, score); diff --git a/db/version_set.h b/db/version_set.h index 68c41b160..51f6d9b6c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -140,13 +140,34 @@ class Version { // REQUIRES: lock is held int NumLevelFiles(int level) const { return files_[level].size(); } + // Return the combined file size of all files at the specified level. + int64_t NumLevelBytes(int level) const; + + // Return a human-readable short (single-line) summary of the number + // of files per level. Uses *scratch as backing store. + struct LevelSummaryStorage { + char buffer[100]; + }; + struct FileSummaryStorage { + char buffer[1000]; + }; + const char* LevelSummary(LevelSummaryStorage* scratch) const; + // Return a human-readable short (single-line) summary of files + // in a specified level. Uses *scratch as backing store. + const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const; + + // Return the maximum overlapping data (in bytes) at next level for any + // file at a level >= 1. + int64_t MaxNextLevelOverlappingBytes(); + + // Add all files listed in the current version to *live. + void AddLiveFiles(std::set* live); + // Return a human readable string that describes this version's contents. std::string DebugString(bool hex = false) const; // Returns the version nuber of this version - uint64_t GetVersionNumber() { - return version_number_; - } + uint64_t GetVersionNumber() const { return version_number_; } private: friend class Compaction; @@ -222,10 +243,8 @@ class Version { class VersionSet { public: - VersionSet(const std::string& dbname, - const Options* options, - const EnvOptions& storage_options, - TableCache* table_cache, + VersionSet(const std::string& dbname, const Options* options, + const EnvOptions& storage_options, TableCache* table_cache, const InternalKeyComparator*); ~VersionSet(); @@ -254,7 +273,7 @@ class VersionSet { // A Flag indicating whether write needs to slowdown because of there are // too many number of level0 files. bool NeedSlowdownForNumLevel0Files() const { - return need_slowdown_for_num_level0_files; + return need_slowdown_for_num_level0_files_; } // Return the current manifest file number @@ -272,9 +291,6 @@ class VersionSet { } } - // Return the combined file size of all files at the specified level. - int64_t NumLevelBytes(int level) const; - // Return the last sequence number. uint64_t LastSequence() const { return last_sequence_.load(std::memory_order_acquire); @@ -321,10 +337,6 @@ class VersionSet { const InternalKey* end, InternalKey** compaction_end); - // Return the maximum overlapping data (in bytes) at next level for any - // file at a level >= 1. - int64_t MaxNextLevelOverlappingBytes(); - // Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed. Iterator* MakeInputIterator(Compaction* c); @@ -368,35 +380,14 @@ class VersionSet { // Add all files listed in any live version to *live. void AddLiveFiles(std::vector* live_list); - // Add all files listed in the current version to *live. - void AddLiveFilesCurrentVersion(std::set* live); - // Return the approximate offset in the database of the data for // "key" as of version "v". uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key); - // Return a human-readable short (single-line) summary of the number - // of files per level. Uses *scratch as backing store. - struct LevelSummaryStorage { - char buffer[100]; - }; - struct FileSummaryStorage { - char buffer[1000]; - }; - const char* LevelSummary(LevelSummaryStorage* scratch) const; - // printf contents (for debugging) Status DumpManifest(Options& options, std::string& manifestFileName, bool verbose, bool hex = false); - // Return a human-readable short (single-line) summary of the data size - // of files per level. Uses *scratch as backing store. - const char* LevelDataSizeSummary(LevelSummaryStorage* scratch) const; - - // Return a human-readable short (single-line) summary of files - // in a specified level. Uses *scratch as backing store. - const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const; - // Return the size of the current manifest file uint64_t ManifestFileSize() const { return manifest_file_size_; } @@ -501,7 +492,9 @@ class VersionSet { Version dummy_versions_; // Head of circular doubly-linked list of versions. Version* current_; // == dummy_versions_.prev_ - bool need_slowdown_for_num_level0_files; + // A flag indicating whether we should delay writes because + // we have too many level 0 files + bool need_slowdown_for_num_level0_files_; // Per-level key at which the next compaction at that level should start. // Either an empty string, or a valid InternalKey.