diff --git a/db/column_family.cc b/db/column_family.cc index afbf69e9b..a3f936e91 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -62,45 +62,59 @@ void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm, ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, Version* dummy_versions, const ColumnFamilyOptions& options) - : id(id), - name(name), - dummy_versions(dummy_versions), - current(nullptr), - options(options), - mem(nullptr), - imm(options.min_write_buffer_number_to_merge), - super_version(nullptr), - log_number(0) {} + : id_(id), + name_(name), + dummy_versions_(dummy_versions), + current_(nullptr), + options_(options), + mem_(nullptr), + imm_(options.min_write_buffer_number_to_merge), + super_version_(nullptr), + super_version_number_(0), + log_number_(0) {} ColumnFamilyData::~ColumnFamilyData() { - if (super_version != nullptr) { + if (super_version_ != nullptr) { bool is_last_reference __attribute__((unused)); - is_last_reference = super_version->Unref(); + is_last_reference = super_version_->Unref(); assert(is_last_reference); - super_version->Cleanup(); - delete super_version; + super_version_->Cleanup(); + delete super_version_; } // List must be empty - assert(dummy_versions->next_ == dummy_versions); - delete dummy_versions; + assert(dummy_versions_->next_ == dummy_versions_); + delete dummy_versions_; - if (mem != nullptr) { - delete mem->Unref(); + if (mem_ != nullptr) { + delete mem_->Unref(); } std::vector to_delete; - imm.current()->Unref(&to_delete); + imm_.current()->Unref(&to_delete); for (MemTable* m : to_delete) { delete m; } } void ColumnFamilyData::CreateNewMemtable() { - assert(current != nullptr); - if (mem != nullptr) { - delete mem->Unref(); + assert(current_ != nullptr); + if (mem_ != nullptr) { + delete mem_->Unref(); } - mem = new MemTable(current->vset_->icmp_, options); - mem->Ref(); + mem_ = new MemTable(current_->vset_->icmp_, options_); + mem_->Ref(); +} + +SuperVersion* ColumnFamilyData::InstallSuperVersion( + SuperVersion* new_superversion) { + new_superversion->Init(mem_, imm_.current(), current_); + SuperVersion* old_superversion = super_version_; + super_version_ = new_superversion; + ++super_version_number_; + if (old_superversion != nullptr && old_superversion->Unref()) { + old_superversion->Cleanup(); + return old_superversion; // will let caller delete outside of mutex + } + return nullptr; } ColumnFamilySet::ColumnFamilySet() : max_column_family_(0) {} @@ -162,8 +176,8 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( void ColumnFamilySet::DropColumnFamily(uint32_t id) { auto cfd = column_family_data_.find(id); assert(cfd != column_family_data_.end()); - column_families_.erase(cfd->second->name); - cfd->second->current->Unref(); + column_families_.erase(cfd->second->GetName()); + cfd->second->current()->Unref(); droppped_column_families_.push_back(cfd->second); column_family_data_.erase(cfd); } @@ -175,8 +189,8 @@ MemTable* ColumnFamilyMemTablesImpl::GetMemTable(uint32_t column_family_id) { // API change in WriteBatch::Handler, which is a public API assert(cfd != nullptr); - if (log_number_ == 0 || log_number_ >= cfd->log_number) { - return cfd->mem; + if (log_number_ == 0 || log_number_ >= cfd->GetLogNumber()) { + return cfd->mem(); } else { return nullptr; } diff --git a/db/column_family.h b/db/column_family.h index 44e459e36..6f20daf17 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -52,30 +52,63 @@ struct SuperVersion { Version* new_current); }; -// column family metadata -struct ColumnFamilyData { - uint32_t id; - std::string name; - Version* dummy_versions; // Head of circular doubly-linked list of versions. - Version* current; // == dummy_versions->prev_ - ColumnFamilyOptions options; - - MemTable* mem; - MemTableList imm; - SuperVersion* super_version; - - // This is the earliest log file number that contains data from this - // Column Family. All earlier log files must be ignored and not - // recovered from - uint64_t log_number; - +// column family metadata. not thread-safe. should be protected by db_mutex +class ColumnFamilyData { + public: ColumnFamilyData(uint32_t id, const std::string& name, Version* dummy_versions, const ColumnFamilyOptions& options); ~ColumnFamilyData(); + uint32_t GetID() const { return id_; } + const std::string& GetName() { return name_; } + + void SetLogNumber(uint64_t log_number) { log_number_ = log_number; } + uint64_t GetLogNumber() const { return log_number_; } + + ColumnFamilyOptions* options() { return &options_; } + + MemTableList* imm() { return &imm_; } + MemTable* mem() { return mem_; } + Version* current() { return current_; } + Version* dummy_versions() { return dummy_versions_; } + void SetMemtable(MemTable* new_mem) { mem_ = new_mem; } + void SetCurrent(Version* current) { current_ = current; } void CreateNewMemtable(); + + SuperVersion* GetSuperVersion() const { return super_version_; } + uint64_t GetSuperVersionNumber() const { + return super_version_number_.load(); + } + // will return a pointer to SuperVersion* if previous SuperVersion + // if its reference count is zero and needs deletion or nullptr if not + // As argument takes a pointer to allocated SuperVersion to enable + // the clients to allocate SuperVersion outside of mutex. + SuperVersion* InstallSuperVersion(SuperVersion* new_superversion); + + private: + uint32_t id_; + const std::string name_; + Version* dummy_versions_; // Head of circular doubly-linked list of versions. + Version* current_; // == dummy_versions->prev_ + ColumnFamilyOptions options_; + + MemTable* mem_; + MemTableList imm_; + SuperVersion* super_version_; + + // An ordinal representing the current SuperVersion. Updated by + // InstallSuperVersion(), i.e. incremented every time super_version_ + // changes. + std::atomic super_version_number_; + + // This is the earliest log file number that contains data from this + // Column Family. All earlier log files must be ignored and not + // recovered from + uint64_t log_number_; }; +// Thread safe only for reading without a writer. All access should be +// locked when adding or dropping column family class ColumnFamilySet { public: class iterator { diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index a68cbbbef..c0284f153 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -74,7 +74,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, // Make a set of all of the live *.sst files std::set live; - default_cfd_->current->AddLiveFiles(&live); + default_cfd_->current()->AddLiveFiles(&live); ret.clear(); ret.reserve(live.size() + 2); //*.sst + CURRENT + MANIFEST diff --git a/db/db_impl.cc b/db/db_impl.cc index 3cc14102c..32823072a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -267,7 +267,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) shutting_down_(nullptr), bg_cv_(&mutex_), logfile_number_(0), - super_version_number_(0), tmp_batch_(), bg_compaction_scheduled_(0), bg_manual_only_(0), @@ -331,7 +330,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) DBImpl::~DBImpl() { // Wait for background work to finish - if (flush_on_destroy_ && default_cfd_->mem->GetFirstSequenceNumber() != 0) { + if (flush_on_destroy_ && default_cfd_->mem()->GetFirstSequenceNumber() != 0) { FlushMemTable(FlushOptions()); } mutex_.Lock(); @@ -930,8 +929,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, std::unordered_map version_edits; for (auto cfd : *versions_->GetColumnFamilySet()) { VersionEdit edit; - edit.SetColumnFamily(cfd->id); - version_edits.insert({cfd->id, edit}); + edit.SetColumnFamily(cfd->GetID()); + version_edits.insert({cfd->GetID(), edit}); } // Open the log file @@ -991,12 +990,12 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, if (!read_only) { for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->mem->ApproximateMemoryUsage() > - cfd->options.write_buffer_size) { - auto iter = version_edits.find(cfd->id); + if (cfd->mem()->ApproximateMemoryUsage() > + cfd->options()->write_buffer_size) { + auto iter = version_edits.find(cfd->GetID()); assert(iter != version_edits.end()); VersionEdit* edit = &iter->second; - status = WriteLevel0TableForRecovery(cfd->mem, edit); + status = WriteLevel0TableForRecovery(cfd->mem(), edit); // we still want to clear the memtable, even if the recovery failed cfd->CreateNewMemtable(); if (!status.ok()) { @@ -1011,12 +1010,12 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, if (!read_only) { for (auto cfd : *versions_->GetColumnFamilySet()) { - auto iter = version_edits.find(cfd->id); + auto iter = version_edits.find(cfd->GetID()); assert(iter != version_edits.end()); VersionEdit* edit = &iter->second; // flush the final memtable - status = WriteLevel0TableForRecovery(cfd->mem, edit); + status = WriteLevel0TableForRecovery(cfd->mem(), edit); // we still want to clear the memtable, even if the recovery failed cfd->CreateNewMemtable(); if (!status.ok()) { @@ -1104,7 +1103,7 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber earliest_seqno_in_memtable = mems[0]->GetFirstSequenceNumber(); - Version* base = default_cfd_->current; + Version* base = default_cfd_->current(); base->Ref(); // it is likely that we do not need this reference Status s; { @@ -1141,7 +1140,7 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, // re-acquire the most current version - base = default_cfd_->current; + base = default_cfd_->current(); // There could be multiple threads writing to its own level-0 file. // The pending_outputs cannot be cleared here, otherwise this newly @@ -1182,9 +1181,9 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, DeletionState& deletion_state) { mutex_.AssertHeld(); - assert(default_cfd_->imm.size() != 0); + assert(default_cfd_->imm()->size() != 0); - if (!default_cfd_->imm.IsFlushPending()) { + if (!default_cfd_->imm()->IsFlushPending()) { Log(options_.info_log, "FlushMemTableToOutputFile already in progress"); Status s = Status::IOError("FlushMemTableToOutputFile already in progress"); return s; @@ -1193,7 +1192,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, // Save the contents of the earliest memtable as a new Table uint64_t file_number; std::vector mems; - default_cfd_->imm.PickMemtablesToFlush(&mems); + default_cfd_->imm()->PickMemtablesToFlush(&mems); if (mems.empty()) { Log(options_.info_log, "Nothing in memstore to flush"); Status s = Status::IOError("Nothing in memstore to flush"); @@ -1228,7 +1227,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, } // Replace immutable memtable with the generated Table - s = default_cfd_->imm.InstallMemtableFlushResults( + s = default_cfd_->imm()->InstallMemtableFlushResults( default_cfd_, mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number, pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get()); @@ -1264,7 +1263,7 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family, int max_level_with_files = 1; { MutexLock l(&mutex_); - Version* base = default_cfd_->current; + Version* base = default_cfd_->current(); for (int level = 1; level < NumberLevels(); level++) { if (base->OverlapInLevel(level, begin, end)) { max_level_with_files = level; @@ -1297,7 +1296,7 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family, // return the same level if it cannot be moved int DBImpl::FindMinimumEmptyLevelFitting(int level) { mutex_.AssertHeld(); - Version* current = default_cfd_->current; + Version* current = default_cfd_->current(); int minimum_level = level; for (int i = level - 1; i > 0; --i) { // stop if level i is not empty @@ -1348,10 +1347,10 @@ Status DBImpl::ReFitLevel(int level, int target_level) { Status status; if (to_level < level) { Log(options_.info_log, "Before refitting:\n%s", - default_cfd_->current->DebugString().data()); + default_cfd_->current()->DebugString().data()); VersionEdit edit; - for (const auto& f : default_cfd_->current->files_[level]) { + for (const auto& f : default_cfd_->current()->files_[level]) { edit.DeleteFile(level, f->number); edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); @@ -1361,14 +1360,14 @@ Status DBImpl::ReFitLevel(int level, int target_level) { status = versions_->LogAndApply(default_cfd_, &edit, &mutex_, db_directory_.get()); - superversion_to_free = InstallSuperVersion(default_cfd_, new_superversion); + superversion_to_free = default_cfd_->InstallSuperVersion(new_superversion); new_superversion = nullptr; Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data()); if (status.ok()) { Log(options_.info_log, "After refitting:\n%s", - default_cfd_->current->DebugString().data()); + default_cfd_->current()->DebugString().data()); } } @@ -1394,7 +1393,7 @@ int DBImpl::Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) { } uint64_t DBImpl::CurrentVersionNumber() const { - return super_version_number_.load(); + return default_cfd_->GetSuperVersionNumber(); } Status DBImpl::Flush(const FlushOptions& options, @@ -1688,10 +1687,10 @@ Status DBImpl::WaitForFlushMemTable() { Status s; // Wait until the compaction completes MutexLock l(&mutex_); - while (default_cfd_->imm.size() > 0 && bg_error_.ok()) { + while (default_cfd_->imm()->size() > 0 && bg_error_.ok()) { bg_cv_.Wait(); } - if (default_cfd_->imm.size() != 0) { + if (default_cfd_->imm()->size() != 0) { s = bg_error_; } return s; @@ -1727,7 +1726,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else { - bool is_flush_pending = default_cfd_->imm.IsFlushPending(); + bool is_flush_pending = default_cfd_->imm()->IsFlushPending(); if (is_flush_pending && (bg_flush_scheduled_ < options_.max_background_flushes)) { // memtable flush needed @@ -1739,7 +1738,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { // flush, but the HIGH pool is not enabled). Do it only if // max_background_compactions hasn't been reached and, in case // bg_manual_only_ > 0, if it's a manual compaction. - if ((manual_compaction_ || default_cfd_->current->NeedsCompaction() || + if ((manual_compaction_ || default_cfd_->current()->NeedsCompaction() || (is_flush_pending && (options_.max_background_flushes <= 0))) && bg_compaction_scheduled_ < options_.max_background_compactions && (!bg_manual_only_ || manual_compaction_)) { @@ -1761,7 +1760,7 @@ void DBImpl::BGWorkCompaction(void* db) { Status DBImpl::BackgroundFlush(bool* madeProgress, DeletionState& deletion_state) { Status stat; - while (stat.ok() && default_cfd_->imm.IsFlushPending()) { + while (stat.ok() && default_cfd_->imm()->IsFlushPending()) { Log(options_.info_log, "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", options_.max_background_flushes - bg_flush_scheduled_); @@ -1818,7 +1817,7 @@ void DBImpl::TEST_PurgeObsoleteteWAL() { uint64_t DBImpl::TEST_GetLevel0TotalSize() { MutexLock l(&mutex_); - return default_cfd_->current->NumLevelBytes(0); + return default_cfd_->current()->NumLevelBytes(0); } void DBImpl::BackgroundCallCompaction() { @@ -1881,7 +1880,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, mutex_.AssertHeld(); // TODO: remove memtable flush from formal compaction - while (default_cfd_->imm.IsFlushPending()) { + while (default_cfd_->imm()->IsFlushPending()) { Log(options_.info_log, "BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots " "available %d", @@ -1940,7 +1939,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast(f->number), c->level() + 1, static_cast(f->file_size), - status.ToString().c_str(), default_cfd_->current->LevelSummary(&tmp)); + status.ToString().c_str(), default_cfd_->current()->LevelSummary(&tmp)); versions_->ReleaseCompactionFiles(c.get(), status); *madeProgress = true; } else { @@ -2287,11 +2286,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { // Prioritize immutable compaction work // TODO: remove memtable flush from normal compaction work - if (default_cfd_->imm.imm_flush_needed.NoBarrier_Load() != nullptr) { + if (default_cfd_->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) { const uint64_t imm_start = env_->NowMicros(); LogFlush(options_.info_log); mutex_.Lock(); - if (default_cfd_->imm.IsFlushPending()) { + if (default_cfd_->imm()->IsFlushPending()) { FlushMemTableToOutputFile(nullptr, deletion_state); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } @@ -2669,11 +2668,11 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, // Collect together all needed child iterators for mem mutex_.Lock(); *latest_snapshot = versions_->LastSequence(); - mutable_mem = default_cfd_->mem; + mutable_mem = default_cfd_->mem(); mutable_mem->Ref(); - immutable_mems = default_cfd_->imm.current(); + immutable_mems = default_cfd_->imm()->current(); immutable_mems->Ref(); - version = default_cfd_->current; + version = default_cfd_->current(); version->Ref(); mutex_.Unlock(); @@ -2710,11 +2709,11 @@ std::pair DBImpl::GetTailingIteratorPair( // get all child iterators and bump their refcounts under lock mutex_.Lock(); - mutable_mem = default_cfd_->mem; + mutable_mem = default_cfd_->mem(); mutable_mem->Ref(); - immutable_mems = default_cfd_->imm.current(); + immutable_mems = default_cfd_->imm()->current(); immutable_mems->Ref(); - version = default_cfd_->current; + version = default_cfd_->current(); version->Ref(); if (superversion_number != nullptr) { *superversion_number = CurrentVersionNumber(); @@ -2756,7 +2755,7 @@ std::pair DBImpl::GetTailingIteratorPair( int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { MutexLock l(&mutex_); - return default_cfd_->current->MaxNextLevelOverlappingBytes(); + return default_cfd_->current()->MaxNextLevelOverlappingBytes(); } Status DBImpl::Get(const ReadOptions& options, @@ -2777,11 +2776,12 @@ Status DBImpl::Get(const ReadOptions& options, // for superversion_to_free void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, DeletionState& deletion_state) { + mutex_.AssertHeld(); // if new_superversion == nullptr, it means somebody already used it SuperVersion* new_superversion = (deletion_state.new_superversion != nullptr) ? deletion_state.new_superversion : new SuperVersion(); - SuperVersion* old_superversion = InstallSuperVersion(cfd, new_superversion); + SuperVersion* old_superversion = cfd->InstallSuperVersion(new_superversion); deletion_state.new_superversion = nullptr; if (deletion_state.superversion_to_free != nullptr) { // somebody already put it there @@ -2791,23 +2791,6 @@ void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, } } -SuperVersion* DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, - SuperVersion* new_superversion) { - mutex_.AssertHeld(); - new_superversion->Init(cfd->mem, cfd->imm.current(), cfd->current); - SuperVersion* old_superversion = cfd->super_version; - cfd->super_version = new_superversion; - if (cfd->id == 0) { - // TODO this is only for default column family for now - ++super_version_number_; - } - if (old_superversion != nullptr && old_superversion->Unref()) { - old_superversion->Cleanup(); - return old_superversion; // will let caller delete outside of mutex - } - return nullptr; -} - Status DBImpl::GetImpl(const ReadOptions& options, const ColumnFamilyHandle& column_family, const Slice& key, std::string* value, @@ -2819,7 +2802,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, // this is asserting because client calling Get() with undefined // ColumnFamilyHandle is undefined behavior. assert(cfd != nullptr); - SuperVersion* get_version = cfd->super_version->Ref(); + SuperVersion* get_version = cfd->GetSuperVersion()->Ref(); mutex_.Unlock(); SequenceNumber snapshot; @@ -2899,9 +2882,9 @@ std::vector DBImpl::MultiGet( } // TODO only works for default column family - MemTable* mem = default_cfd_->mem; - MemTableListVersion* imm = default_cfd_->imm.current(); - Version* current = default_cfd_->current; + MemTable* mem = default_cfd_->mem(); + MemTableListVersion* imm = default_cfd_->imm()->current(); + Version* current = default_cfd_->current(); mem->Ref(); imm->Ref(); current->Ref(); @@ -3340,7 +3323,7 @@ Status DBImpl::MakeRoomForWrite(bool force, // this delay hands over some CPU to the compaction thread in // case it is sharing the same core as the writer. uint64_t slowdown = - SlowdownAmount(default_cfd_->current->NumLevelFiles(0), + SlowdownAmount(default_cfd_->current()->NumLevelFiles(0), options_.level0_slowdown_writes_trigger, options_.level0_stop_writes_trigger); mutex_.Unlock(); @@ -3356,14 +3339,14 @@ Status DBImpl::MakeRoomForWrite(bool force, allow_delay = false; // Do not delay a single write more than once mutex_.Lock(); delayed_writes_++; - } else if (!force && (default_cfd_->mem->ApproximateMemoryUsage() <= + } else if (!force && (default_cfd_->mem()->ApproximateMemoryUsage() <= options_.write_buffer_size)) { // There is room in current memtable if (allow_delay) { DelayLoggingAndReset(); } break; - } else if (default_cfd_->imm.size() == + } else if (default_cfd_->imm()->size() == options_.max_write_buffer_number - 1) { // We have filled up the current memtable, but the previous // ones are still being compacted, so we wait. @@ -3380,7 +3363,7 @@ Status DBImpl::MakeRoomForWrite(bool force, STALL_MEMTABLE_COMPACTION_MICROS, stall); stall_memtable_compaction_ += stall; stall_memtable_compaction_count_++; - } else if (default_cfd_->current->NumLevelFiles(0) >= + } else if (default_cfd_->current()->NumLevelFiles(0) >= options_.level0_stop_writes_trigger) { // There are too many level-0 files. DelayLoggingAndReset(); @@ -3396,10 +3379,10 @@ Status DBImpl::MakeRoomForWrite(bool force, stall_level0_num_files_ += stall; stall_level0_num_files_count_++; } else if (allow_hard_rate_limit_delay && options_.hard_rate_limit > 1.0 && - (score = default_cfd_->current->MaxCompactionScore()) > + (score = default_cfd_->current()->MaxCompactionScore()) > options_.hard_rate_limit) { // Delay a write when the compaction score for any level is too large. - int max_level = default_cfd_->current->MaxCompactionScoreLevel(); + int max_level = default_cfd_->current()->MaxCompactionScoreLevel(); mutex_.Unlock(); uint64_t delayed; { @@ -3422,7 +3405,7 @@ Status DBImpl::MakeRoomForWrite(bool force, } mutex_.Lock(); } else if (allow_soft_rate_limit_delay && options_.soft_rate_limit > 0.0 && - (score = default_cfd_->current->MaxCompactionScore()) > + (score = default_cfd_->current()->MaxCompactionScore()) > options_.soft_rate_limit) { // Delay a write when the compaction score for any level is too large. // TODO: add statistics @@ -3473,21 +3456,20 @@ Status DBImpl::MakeRoomForWrite(bool force, } logfile_number_ = new_log_number; log_.reset(new log::Writer(std::move(lfile))); - default_cfd_->mem->SetNextLogNumber(logfile_number_); - default_cfd_->imm.Add(default_cfd_->mem); + default_cfd_->mem()->SetNextLogNumber(logfile_number_); + default_cfd_->imm()->Add(default_cfd_->mem()); if (force) { - default_cfd_->imm.FlushRequested(); + default_cfd_->imm()->FlushRequested(); } - default_cfd_->mem = memtmp; - default_cfd_->mem->Ref(); - Log(options_.info_log, - "New memtable created with log file: #%lu\n", + memtmp->Ref(); + memtmp->SetLogNumber(logfile_number_); + default_cfd_->SetMemtable(memtmp); + Log(options_.info_log, "New memtable created with log file: #%lu\n", (unsigned long)logfile_number_); - default_cfd_->mem->SetLogNumber(logfile_number_); force = false; // Do not force another compaction if have room MaybeScheduleFlushOrCompaction(); *superversion_to_free = - InstallSuperVersion(default_cfd_, new_superversion); + default_cfd_->InstallSuperVersion(new_superversion); } } return s; @@ -3511,7 +3493,7 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, value->clear(); MutexLock l(&mutex_); - Version* current = default_cfd_->current; + Version* current = default_cfd_->current(); Slice in = property; Slice prefix("rocksdb."); if (!in.starts_with(prefix)) return false; @@ -3792,10 +3774,10 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, return true; } else if (in == "sstables") { - *value = default_cfd_->current->DebugString(); + *value = default_cfd_->current()->DebugString(); return true; } else if (in == "num-immutable-mem-table") { - *value = std::to_string(default_cfd_->imm.size()); + *value = std::to_string(default_cfd_->imm()->size()); return true; } @@ -3808,7 +3790,7 @@ void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family, Version* v; { MutexLock l(&mutex_); - v = default_cfd_->current; + v = default_cfd_->current(); v->Ref(); } @@ -3885,7 +3867,7 @@ Status DBImpl::DeleteFile(std::string name) { // This is to make sure that any deletion tombstones are not // lost. Check that the level passed is the last level. for (int i = level + 1; i < maxlevel; i++) { - if (cfd->current->NumLevelFiles(i) != 0) { + if (cfd->current()->NumLevelFiles(i) != 0) { Log(options_.info_log, "DeleteFile %s FAILED. File not in last level\n", name.c_str()); return Status::InvalidArgument("File not in last level"); @@ -4060,8 +4042,8 @@ Status DB::OpenWithColumnFamilies( } if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - delete impl->InstallSuperVersion(cfd, new SuperVersion()); - cfd->mem->SetLogNumber(impl->logfile_number_); + delete cfd->InstallSuperVersion(new SuperVersion()); + cfd->mem()->SetLogNumber(impl->logfile_number_); } impl->DeleteObsoleteFiles(); impl->MaybeScheduleFlushOrCompaction(); @@ -4071,7 +4053,7 @@ Status DB::OpenWithColumnFamilies( } if (s.ok() && impl->options_.compaction_style == kCompactionStyleUniversal) { - Version* current = impl->default_cfd_->current; + Version* current = impl->default_cfd_->current(); for (int i = 1; i < impl->NumberLevels(); i++) { int num_files = current->NumLevelFiles(i); if (num_files > 0) { diff --git a/db/db_impl.h b/db/db_impl.h index 1355fa498..b237a4e89 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -401,11 +401,6 @@ class DBImpl : public DB { ColumnFamilyData* default_cfd_; unique_ptr column_family_memtables_; - // An ordinal representing the current SuperVersion. Updated by - // InstallSuperVersion(), i.e. incremented every time super_version_ - // changes. - std::atomic super_version_number_; - std::string host_name_; std::unique_ptr db_directory_; @@ -587,16 +582,8 @@ class DBImpl : public DB { std::vector& snapshots, SequenceNumber* prev_snapshot); - // will return a pointer to SuperVersion* if previous SuperVersion - // if its reference count is zero and needs deletion or nullptr if not - // As argument takes a pointer to allocated SuperVersion - // Foreground threads call this function directly (they don't carry - // deletion state and have to handle their own creation and deletion - // of SuperVersion) - SuperVersion* InstallSuperVersion(ColumnFamilyData* cfd, - SuperVersion* new_superversion); // Background threads call this function, which is just a wrapper around - // the InstallSuperVersion() function above. Background threads carry + // the cfd->InstallSuperVersion() function. Background threads carry // deletion_state which can have new_superversion already allocated. void InstallSuperVersion(ColumnFamilyData* cfd, DeletionState& deletion_state); diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index c176259d5..cb4952dd8 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -56,8 +56,8 @@ Status DBImplReadOnly::Get(const ReadOptions& options, const ColumnFamilyHandle& column_family, const Slice& key, std::string* value) { Status s; - MemTable* mem = GetDefaultColumnFamily()->mem; - Version* current = GetDefaultColumnFamily()->current; + MemTable* mem = GetDefaultColumnFamily()->mem(); + Version* current = GetDefaultColumnFamily()->current(); SequenceNumber snapshot = versions_->LastSequence(); MergeContext merge_context; LookupKey lkey(key, snapshot); diff --git a/db/db_stats_logger.cc b/db/db_stats_logger.cc index be6e469cb..30c2a6384 100644 --- a/db/db_stats_logger.cc +++ b/db/db_stats_logger.cc @@ -65,7 +65,7 @@ void DBImpl::LogDBDeployStats() { uint64_t file_total_size = 0; uint32_t file_total_num = 0; - Version* current = default_cfd_->current; + Version* current = default_cfd_->current(); for (int i = 0; i < current->NumberLevels(); i++) { file_total_num += current->NumLevelFiles(i); file_total_size += current->NumLevelBytes(i); diff --git a/db/version_set.cc b/db/version_set.cc index e7e479d73..d3ba5a64b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1390,7 +1390,7 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options, VersionSet::~VersionSet() { for (auto cfd : *column_family_set_) { - cfd->current->Unref(); + cfd->current()->Unref(); } // we need to delete column_family_set_ because its destructor depends on // VersionSet @@ -1405,20 +1405,21 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, Version* v) { // Make "v" current assert(v->refs_ == 0); - assert(v != column_family_data->current); - if (column_family_data->current != nullptr) { - assert(column_family_data->current->refs_ > 0); - column_family_data->current->Unref(); + Version* current = column_family_data->current(); + assert(v != current); + if (current != nullptr) { + assert(current->refs_ > 0); + current->Unref(); } - column_family_data->current = v; + column_family_data->SetCurrent(v); need_slowdown_for_num_level0_files_ = (options_->level0_slowdown_writes_trigger >= 0 && v->NumLevelFiles(0) >= options_->level0_slowdown_writes_trigger); v->Ref(); // Append to linked list - v->prev_ = column_family_data->dummy_versions->prev_; - v->next_ = column_family_data->dummy_versions; + v->prev_ = column_family_data->dummy_versions()->prev_; + v->next_ = column_family_data->dummy_versions(); v->prev_->next_ = v; v->next_->prev_ = v; } @@ -1441,7 +1442,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, std::vector batch_edits; Version* v = new Version(this, current_version_number_++); - Builder builder(this, column_family_data->current); + Builder builder(this, column_family_data->current()); // process all requests in the queue ManifestWriter* last_writer = &w; @@ -1567,7 +1568,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, manifest_file_size_ = new_manifest_file_size; AppendVersion(column_family_data, v); log_number_ = edit->log_number_; - column_family_data->log_number = edit->log_number_; + column_family_data->SetLogNumber(edit->log_number_); prev_log_number_ = edit->prev_log_number_; } else { @@ -1676,7 +1677,7 @@ Status VersionSet::Recover( } else { ColumnFamilyData* default_cfd = CreateColumnFamily(default_cf_iter->second, &default_cf_edit); - builders.insert({0, new Builder(this, default_cfd->current)}); + builders.insert({0, new Builder(this, default_cfd->current())}); } { @@ -1722,7 +1723,7 @@ Status VersionSet::Recover( ColumnFamilyData* new_cfd = CreateColumnFamily(cf_options->second, &edit); builders.insert( - {edit.column_family_, new Builder(this, new_cfd->current)}); + {edit.column_family_, new Builder(this, new_cfd->current())}); } } else if (edit.is_column_family_drop_) { if (cf_in_builders) { @@ -1748,14 +1749,14 @@ Status VersionSet::Recover( auto cfd = column_family_set_->GetColumnFamily(edit.column_family_); // this should never happen since cf_in_builders is true assert(cfd != nullptr); - if (edit.max_level_ >= cfd->current->NumberLevels()) { + if (edit.max_level_ >= cfd->current()->NumberLevels()) { s = Status::InvalidArgument( "db has more levels than options.num_levels"); break; } if (edit.has_log_number_) { - cfd->log_number = edit.log_number_; + cfd->SetLogNumber(edit.log_number_); } // if it is not column family add or column family drop, @@ -1817,7 +1818,7 @@ Status VersionSet::Recover( if (s.ok()) { for (auto cfd : *column_family_set_) { Version* v = new Version(this, current_version_number_++); - builders[cfd->id]->SaveTo(v); + builders[cfd->GetID()]->SaveTo(v); // Install recovered version std::vector size_being_compacted(v->NumberLevels() - 1); @@ -1846,7 +1847,7 @@ Status VersionSet::Recover( for (auto cfd : *column_family_set_) { Log(options_->info_log, "Column family \"%s\", log number is %lu\n", - cfd->name.c_str(), cfd->log_number); + cfd->GetName().c_str(), cfd->GetLogNumber()); } } @@ -1936,7 +1937,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, } Version* current_version = - versions.GetColumnFamilySet()->GetDefault()->current; + versions.GetColumnFamilySet()->GetDefault()->current(); int current_levels = current_version->NumberLevels(); if (current_levels <= new_levels) { @@ -2009,7 +2010,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, uint64_t prev_log_number = 0; int count = 0; // TODO works only for default column family currently - VersionSet::Builder builder(this, column_family_set_->GetDefault()->current); + VersionSet::Builder builder(this, + column_family_set_->GetDefault()->current()); { VersionSet::LogReporter reporter; @@ -2120,11 +2122,11 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { { // Store column family info VersionEdit edit; - if (cfd->id != 0) { + if (cfd->GetID() != 0) { // default column family is always there, // no need to explicitly write it - edit.AddColumnFamily(cfd->name); - edit.SetColumnFamily(cfd->id); + edit.AddColumnFamily(cfd->GetName()); + edit.SetColumnFamily(cfd->GetID()); std::string record; edit.EncodeTo(&record); Status s = log->AddRecord(record); @@ -2137,10 +2139,10 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { { // Save files VersionEdit edit; - edit.SetColumnFamily(cfd->id); + edit.SetColumnFamily(cfd->GetID()); for (int level = 0; level < NumberLevels(); level++) { - for (const auto& f : cfd->current->files_[level]) { + for (const auto& f : cfd->current()->files_[level]) { edit.AddFile(level, f->number, f->file_size, @@ -2150,7 +2152,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { f->largest_seqno); } } - edit.SetLogNumber(cfd->log_number); + edit.SetLogNumber(cfd->GetLogNumber()); std::string record; edit.EncodeTo(&record); Status s = log->AddRecord(record); @@ -2235,7 +2237,8 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { // pre-calculate space requirement int64_t total_files = 0; for (auto cfd : *column_family_set_) { - for (Version* v = cfd->dummy_versions->next_; v != cfd->dummy_versions; + Version* dummy_versions = cfd->dummy_versions(); + for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) { for (int level = 0; level < v->NumberLevels(); level++) { total_files += v->files_[level].size(); @@ -2247,7 +2250,8 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { live_list->reserve(live_list->size() + total_files); for (auto cfd : *column_family_set_) { - for (Version* v = cfd->dummy_versions->next_; v != cfd->dummy_versions; + Version* dummy_versions = cfd->dummy_versions(); + for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) { for (int level = 0; level < v->NumberLevels(); level++) { for (const auto& f : v->files_[level]) { @@ -2260,7 +2264,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { Compaction* VersionSet::PickCompaction() { // TODO this only works for default column family now - Version* version = column_family_set_->GetDefault()->current; + Version* version = column_family_set_->GetDefault()->current(); return compaction_picker_->PickCompaction(version); } @@ -2269,7 +2273,7 @@ Compaction* VersionSet::CompactRange(int input_level, int output_level, const InternalKey* end, InternalKey** compaction_end) { // TODO this only works for default column family now - Version* version = column_family_set_->GetDefault()->current; + Version* version = column_family_set_->GetDefault()->current(); return compaction_picker_->CompactRange(version, input_level, output_level, begin, end, compaction_end); } @@ -2321,7 +2325,7 @@ uint64_t VersionSet::MaxFileSizeForLevel(int level) { bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { #ifndef NDEBUG // TODO this only works for default column family now - Version* version = column_family_set_->GetDefault()->current; + Version* version = column_family_set_->GetDefault()->current(); if (c->input_version() != version) { Log(options_->info_log, "VerifyCompactionFileConsistency version mismatch"); } @@ -2374,7 +2378,7 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, FileMetaData* meta, ColumnFamilyData** cfd) { for (auto cfd_iter : *column_family_set_) { - Version* version = cfd_iter->current; + Version* version = cfd_iter->current(); for (int level = 0; level < version->NumberLevels(); level++) { for (const auto& file : version->files_[level]) { if (file->number == number) { @@ -2392,7 +2396,7 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { for (auto cfd : *column_family_set_) { for (int level = 0; level < NumberLevels(); level++) { - for (const auto& file : cfd->current->files_[level]) { + for (const auto& file : cfd->current()->files_[level]) { LiveFileMetaData filemetadata; filemetadata.name = TableFileName("", file->number); filemetadata.level = level; diff --git a/db/version_set.h b/db/version_set.h index 2a21565a0..38f4639d5 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -363,8 +363,8 @@ class VersionSet { uint64_t MinLogNumber() const { uint64_t min_log_num = 0; for (auto cfd : *column_family_set_) { - if (min_log_num == 0 || min_log_num > cfd->log_number) { - min_log_num = cfd->log_number; + if (min_log_num == 0 || min_log_num > cfd->GetLogNumber()) { + min_log_num = cfd->GetLogNumber(); } } return min_log_num; @@ -448,7 +448,7 @@ class VersionSet { friend class Compaction; friend class Version; // TODO(icanadi) temporarily until we have what ColumnFamilyData needs (icmp_) - friend struct ColumnFamilyData; + friend class ColumnFamilyData; struct LogReporter : public log::Reader::Reporter { Status* status; diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 169495619..157126de6 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -1028,7 +1028,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, int max = -1; auto default_cfd = versions.GetColumnFamilySet()->GetDefault(); for (int i = 0; i < versions.NumberLevels(); i++) { - if (default_cfd->current->NumLevelFiles(i)) { + if (default_cfd->current()->NumLevelFiles(i)) { max = i; } }