diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 04d6d0e17..a68cbbbef 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -74,7 +74,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, // Make a set of all of the live *.sst files std::set live; - versions_->current()->AddLiveFiles(&live); + default_cfd_->current->AddLiveFiles(&live); ret.clear(); ret.reserve(live.size() + 2); //*.sst + CURRENT + MANIFEST diff --git a/db/db_impl.cc b/db/db_impl.cc index 680a07ff0..004661bc0 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1079,7 +1079,7 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber earliest_seqno_in_memtable = mems[0]->GetFirstSequenceNumber(); - Version* base = versions_->current(); + Version* base = default_cfd_->current; base->Ref(); // it is likely that we do not need this reference Status s; { @@ -1116,7 +1116,7 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, // re-acquire the most current version - base = versions_->current(); + base = default_cfd_->current; // There could be multiple threads writing to its own level-0 file. // The pending_outputs cannot be cleared here, otherwise this newly @@ -1239,7 +1239,7 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family, int max_level_with_files = 1; { MutexLock l(&mutex_); - Version* base = versions_->current(); + Version* base = default_cfd_->current; for (int level = 1; level < NumberLevels(); level++) { if (base->OverlapInLevel(level, begin, end)) { max_level_with_files = level; @@ -1272,7 +1272,7 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family, // return the same level if it cannot be moved int DBImpl::FindMinimumEmptyLevelFitting(int level) { mutex_.AssertHeld(); - Version* current = versions_->current(); + Version* current = default_cfd_->current; int minimum_level = level; for (int i = level - 1; i > 0; --i) { // stop if level i is not empty @@ -1323,10 +1323,10 @@ Status DBImpl::ReFitLevel(int level, int target_level) { Status status; if (to_level < level) { Log(options_.info_log, "Before refitting:\n%s", - versions_->current()->DebugString().data()); + default_cfd_->current->DebugString().data()); VersionEdit edit; - for (const auto& f : versions_->current()->files_[level]) { + for (const auto& f : default_cfd_->current->files_[level]) { edit.DeleteFile(level, f->number); edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); @@ -1343,7 +1343,7 @@ Status DBImpl::ReFitLevel(int level, int target_level) { if (status.ok()) { Log(options_.info_log, "After refitting:\n%s", - versions_->current()->DebugString().data()); + default_cfd_->current->DebugString().data()); } } @@ -1714,8 +1714,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { // flush, but the HIGH pool is not enabled). Do it only if // max_background_compactions hasn't been reached and, in case // bg_manual_only_ > 0, if it's a manual compaction. - if ((manual_compaction_ || - versions_->current()->NeedsCompaction() || + if ((manual_compaction_ || default_cfd_->current->NeedsCompaction() || (is_flush_pending && (options_.max_background_flushes <= 0))) && bg_compaction_scheduled_ < options_.max_background_compactions && (!bg_manual_only_ || manual_compaction_)) { @@ -1794,7 +1793,7 @@ void DBImpl::TEST_PurgeObsoleteteWAL() { uint64_t DBImpl::TEST_GetLevel0TotalSize() { MutexLock l(&mutex_); - return versions_->current()->NumLevelBytes(0); + return default_cfd_->current->NumLevelBytes(0); } void DBImpl::BackgroundCallCompaction() { @@ -1916,7 +1915,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast(f->number), c->level() + 1, static_cast(f->file_size), - status.ToString().c_str(), versions_->current()->LevelSummary(&tmp)); + status.ToString().c_str(), default_cfd_->current->LevelSummary(&tmp)); versions_->ReleaseCompactionFiles(c.get(), status); *madeProgress = true; } else { @@ -2206,7 +2205,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, compact->compaction->Summary(scratch, sizeof(scratch)); Log(options_.info_log, "Compaction start summary: %s\n", scratch); - assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0); + assert(compact->compaction->input_version()->NumLevelFiles( + compact->compaction->level()) > 0); assert(compact->builder == nullptr); assert(!compact->outfile); @@ -2584,7 +2584,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, "compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) " "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " "write-amplify(%.1f) %s\n", - versions_->current()->LevelSummary(&tmp), + compact->compaction->input_version()->LevelSummary(&tmp), (stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) / (double)stats.micros, compact->compaction->output_level(), stats.files_in_leveln, @@ -2648,8 +2648,8 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, mutable_mem->Ref(); immutable_mems = default_cfd_->imm.current(); immutable_mems->Ref(); - versions_->current()->Ref(); - version = versions_->current(); + version = default_cfd_->current; + version->Ref(); mutex_.Unlock(); std::vector iterator_list; @@ -2689,7 +2689,7 @@ std::pair DBImpl::GetTailingIteratorPair( mutable_mem->Ref(); immutable_mems = default_cfd_->imm.current(); immutable_mems->Ref(); - version = versions_->current(); + version = default_cfd_->current; version->Ref(); if (superversion_number != nullptr) { *superversion_number = CurrentVersionNumber(); @@ -2731,7 +2731,7 @@ std::pair DBImpl::GetTailingIteratorPair( int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { MutexLock l(&mutex_); - return versions_->current()->MaxNextLevelOverlappingBytes(); + return default_cfd_->current->MaxNextLevelOverlappingBytes(); } Status DBImpl::Get(const ReadOptions& options, @@ -3299,7 +3299,7 @@ Status DBImpl::MakeRoomForWrite(bool force, // this delay hands over some CPU to the compaction thread in // case it is sharing the same core as the writer. uint64_t slowdown = - SlowdownAmount(versions_->current()->NumLevelFiles(0), + SlowdownAmount(default_cfd_->current->NumLevelFiles(0), options_.level0_slowdown_writes_trigger, options_.level0_stop_writes_trigger); mutex_.Unlock(); @@ -3339,7 +3339,7 @@ Status DBImpl::MakeRoomForWrite(bool force, STALL_MEMTABLE_COMPACTION_MICROS, stall); stall_memtable_compaction_ += stall; stall_memtable_compaction_count_++; - } else if (versions_->current()->NumLevelFiles(0) >= + } else if (default_cfd_->current->NumLevelFiles(0) >= options_.level0_stop_writes_trigger) { // There are too many level-0 files. DelayLoggingAndReset(); @@ -3355,10 +3355,10 @@ Status DBImpl::MakeRoomForWrite(bool force, stall_level0_num_files_ += stall; stall_level0_num_files_count_++; } else if (allow_hard_rate_limit_delay && options_.hard_rate_limit > 1.0 && - (score = versions_->current()->MaxCompactionScore()) > + (score = default_cfd_->current->MaxCompactionScore()) > options_.hard_rate_limit) { // Delay a write when the compaction score for any level is too large. - int max_level = versions_->current()->MaxCompactionScoreLevel(); + int max_level = default_cfd_->current->MaxCompactionScoreLevel(); mutex_.Unlock(); uint64_t delayed; { @@ -3381,7 +3381,7 @@ Status DBImpl::MakeRoomForWrite(bool force, } mutex_.Lock(); } else if (allow_soft_rate_limit_delay && options_.soft_rate_limit > 0.0 && - (score = versions_->current()->MaxCompactionScore()) > + (score = default_cfd_->current->MaxCompactionScore()) > options_.soft_rate_limit) { // Delay a write when the compaction score for any level is too large. // TODO: add statistics @@ -3470,7 +3470,7 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, value->clear(); MutexLock l(&mutex_); - Version* current = versions_->current(); + Version* current = default_cfd_->current; Slice in = property; Slice prefix("rocksdb."); if (!in.starts_with(prefix)) return false; @@ -3734,7 +3734,7 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, return true; } else if (in == "sstables") { - *value = versions_->current()->DebugString(); + *value = default_cfd_->current->DebugString(); return true; } else if (in == "num-immutable-mem-table") { *value = std::to_string(default_cfd_->imm.size()); @@ -3750,8 +3750,8 @@ void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family, Version* v; { MutexLock l(&mutex_); - versions_->current()->Ref(); - v = versions_->current(); + v = default_cfd_->current; + v->Ref(); } for (int i = 0; i < n; i++) { @@ -3803,11 +3803,12 @@ Status DBImpl::DeleteFile(std::string name) { int level; FileMetaData metadata; int maxlevel = NumberLevels(); + ColumnFamilyData* cfd; VersionEdit edit; DeletionState deletion_state(0, true); { MutexLock l(&mutex_); - status = versions_->GetMetadataForFile(number, &level, &metadata); + status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd); if (!status.ok()) { Log(options_.info_log, "DeleteFile %s failed. File not found\n", name.c_str()); @@ -3826,17 +3827,16 @@ Status DBImpl::DeleteFile(std::string name) { // This is to make sure that any deletion tombstones are not // lost. Check that the level passed is the last level. for (int i = level + 1; i < maxlevel; i++) { - if (versions_->current()->NumLevelFiles(i) != 0) { + if (cfd->current->NumLevelFiles(i) != 0) { Log(options_.info_log, "DeleteFile %s FAILED. File not in last level\n", name.c_str()); return Status::InvalidArgument("File not in last level"); } } edit.DeleteFile(level, number); - status = versions_->LogAndApply(default_cfd_, &edit, &mutex_, - db_directory_.get()); + status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get()); if (status.ok()) { - InstallSuperVersion(default_cfd_, deletion_state); + InstallSuperVersion(cfd, deletion_state); } FindObsoleteFiles(deletion_state, false); } // lock released here @@ -4010,7 +4010,7 @@ Status DB::OpenWithColumnFamilies( } if (s.ok() && impl->options_.compaction_style == kCompactionStyleUniversal) { - Version* current = impl->versions_->current(); + Version* current = impl->default_cfd_->current; for (int i = 1; i < impl->NumberLevels(); i++) { int num_files = current->NumLevelFiles(i); if (num_files > 0) { diff --git a/db/db_impl.h b/db/db_impl.h index 046cb28c7..f16d4a65c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -258,10 +258,7 @@ class DBImpl : public DB { return internal_comparator_.user_comparator(); } - MemTable* GetMemTable() { - // TODO currently only works for default column family - return default_cfd_->mem; - } + ColumnFamilyData* GetDefaultColumnFamily() { return default_cfd_; } Iterator* NewInternalIterator(const ReadOptions&, SequenceNumber* latest_snapshot); diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 1810a9620..c176259d5 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -56,8 +56,8 @@ Status DBImplReadOnly::Get(const ReadOptions& options, const ColumnFamilyHandle& column_family, const Slice& key, std::string* value) { Status s; - MemTable* mem = GetMemTable(); - Version* current = versions_->current(); + MemTable* mem = GetDefaultColumnFamily()->mem; + Version* current = GetDefaultColumnFamily()->current; SequenceNumber snapshot = versions_->LastSequence(); MergeContext merge_context; LookupKey lkey(key, snapshot); diff --git a/db/db_stats_logger.cc b/db/db_stats_logger.cc index db86865ca..be6e469cb 100644 --- a/db/db_stats_logger.cc +++ b/db/db_stats_logger.cc @@ -65,7 +65,7 @@ void DBImpl::LogDBDeployStats() { uint64_t file_total_size = 0; uint32_t file_total_num = 0; - Version* current = versions_->current(); + Version* current = default_cfd_->current; for (int i = 0; i < current->NumberLevels(); i++) { file_total_num += current->NumLevelFiles(i); file_total_size += current->NumLevelBytes(i); diff --git a/db/version_set.cc b/db/version_set.cc index 2580233e3..f6e9e6d5a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1923,7 +1923,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, return status; } - Version* current_version = versions.current(); + Version* current_version = + versions.GetColumnFamilySet()->GetDefault()->current; int current_levels = current_version->NumberLevels(); if (current_levels <= new_levels) { @@ -2357,14 +2358,16 @@ void VersionSet::ReleaseCompactionFiles(Compaction* c, Status status) { } Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, - FileMetaData* meta) { - for (auto cfd : *column_family_set_) { - Version* version = cfd->current; + FileMetaData* meta, + ColumnFamilyData** cfd) { + for (auto cfd_iter : *column_family_set_) { + Version* version = cfd_iter->current; for (int level = 0; level < version->NumberLevels(); level++) { for (const auto& file : version->files_[level]) { if (file->number == number) { *meta = *file; *filelevel = level; + *cfd = cfd_iter; return Status::OK(); } } diff --git a/db/version_set.h b/db/version_set.h index 308228d7b..1cda55ca1 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -314,12 +314,6 @@ class VersionSet { const EnvOptions& storage_options, int new_levels); - // Return the current version. - Version* current() const { - // TODO this only works for default column family now - return column_family_set_->GetDefault()->current; - } - // A Flag indicating whether write needs to slowdown because of there are // too many number of level0 files. bool NeedSlowdownForNumLevel0Files() const { @@ -418,8 +412,8 @@ class VersionSet { void ReleaseCompactionFiles(Compaction* c, Status status); - Status GetMetadataForFile( - uint64_t number, int *filelevel, FileMetaData *metadata); + Status GetMetadataForFile(uint64_t number, int* filelevel, + FileMetaData* metadata, ColumnFamilyData** cfd); void GetLiveFilesMetaData( std::vector *metadata); diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index a792f0b1c..169495619 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -1026,8 +1026,9 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, return st; } int max = -1; + auto default_cfd = versions.GetColumnFamilySet()->GetDefault(); for (int i = 0; i < versions.NumberLevels(); i++) { - if (versions.current()->NumLevelFiles(i)) { + if (default_cfd->current->NumLevelFiles(i)) { max = i; } }