diff --git a/db/column_family.cc b/db/column_family.cc index 9b7a5284d..234d5e50d 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -118,7 +118,7 @@ void ColumnFamilyData::CreateNewMemtable() { if (mem_ != nullptr) { delete mem_->Unref(); } - mem_ = new MemTable(current_->vset_->icmp_, options_); + mem_ = new MemTable(icmp_, options_); mem_->Ref(); } diff --git a/db/column_family.h b/db/column_family.h index 513eadd3e..7c1920308 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -89,6 +89,7 @@ class ColumnFamilyData { CompactionPicker* compaction_picker() const { return compaction_picker_.get(); } + const Comparator* user_comparator() const { return icmp_.user_comparator(); } const InternalKeyComparator& internal_comparator() const { return icmp_; } SuperVersion* GetSuperVersion() const { return super_version_; } diff --git a/db/compaction.cc b/db/compaction.cc index d045a83d1..279481d68 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -79,12 +79,11 @@ void Compaction::AddInputDeletions(VersionEdit* edit) { } bool Compaction::IsBaseLevelForKey(const Slice& user_key) { - if (input_version_->vset_->options_->compaction_style == - kCompactionStyleUniversal) { + if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { return bottommost_level_; } // Maybe use binary search to find right entry instead of linear search? - const Comparator* user_cmp = input_version_->vset_->icmp_.user_comparator(); + const Comparator* user_cmp = cfd_->user_comparator(); for (int lvl = level_ + 2; lvl < number_levels_; lvl++) { const std::vector& files = input_version_->files_[lvl]; for (; level_ptrs_[lvl] < files.size(); ) { @@ -105,7 +104,7 @@ bool Compaction::IsBaseLevelForKey(const Slice& user_key) { bool Compaction::ShouldStopBefore(const Slice& internal_key) { // Scan to find earliest grandparent file that contains key. - const InternalKeyComparator* icmp = &input_version_->vset_->icmp_; + const InternalKeyComparator* icmp = &cfd_->internal_comparator(); while (grandparent_index_ < grandparents_.size() && icmp->Compare(internal_key, grandparents_[grandparent_index_]->largest.Encode()) > 0) { @@ -143,8 +142,7 @@ void Compaction::MarkFilesBeingCompacted(bool value) { // Is this compaction producing files at the bottommost level? void Compaction::SetupBottomMostLevel(bool isManual) { - if (input_version_->vset_->options_->compaction_style == - kCompactionStyleUniversal) { + if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { // If universal compaction style is used and manual // compaction is occuring, then we are guaranteed that // all files will be picked in a single compaction @@ -157,8 +155,7 @@ void Compaction::SetupBottomMostLevel(bool isManual) { return; } bottommost_level_ = true; - int num_levels = input_version_->vset_->NumberLevels(); - for (int i = output_level() + 1; i < num_levels; i++) { + for (int i = output_level() + 1; i < number_levels_; i++) { if (input_version_->NumLevelFiles(i) > 0) { bottommost_level_ = false; break; diff --git a/db/db_impl.cc b/db/db_impl.cc index 63b7e63d6..a5b9f99ba 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -294,8 +294,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) table_cache_.reset(new TableCache(dbname_, &options_, storage_options_, table_cache_size)); - versions_.reset(new VersionSet(dbname_, &options_, storage_options_, - table_cache_.get(), &internal_comparator_)); + versions_.reset( + new VersionSet(dbname_, &options_, storage_options_, table_cache_.get())); column_family_memtables_.reset( new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); @@ -1127,8 +1127,8 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, (unsigned long)m->GetLogNumber()); list.push_back(m->NewIterator()); } - Iterator* iter = NewMergingIterator(&internal_comparator_, &list[0], - list.size()); + Iterator* iter = + NewMergingIterator(&cfd->internal_comparator(), &list[0], list.size()); Log(options_.info_log, "Level-0 flush table #%lu: started", (unsigned long)meta.number); @@ -1290,7 +1290,7 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family, for (int level = 0; level <= max_level_with_files; level++) { // in case the compaction is unversal or if we're compacting the // bottom-most level, the output level will be the same as input one - if (options_.compaction_style == kCompactionStyleUniversal || + if (cfd->options()->compaction_style == kCompactionStyleUniversal || level == max_level_with_files) { s = RunManualCompaction(cfd, level, level, begin, end); } else { @@ -1400,15 +1400,27 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { } int DBImpl::NumberLevels(const ColumnFamilyHandle& column_family) { - return options_.num_levels; + mutex_.Lock(); + auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); + mutex_.Unlock(); + assert(cfd != nullptr); + return cfd->NumberLevels(); } int DBImpl::MaxMemCompactionLevel(const ColumnFamilyHandle& column_family) { - return options_.max_mem_compaction_level; + mutex_.Lock(); + auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); + mutex_.Unlock(); + assert(cfd != nullptr); + return cfd->options()->max_mem_compaction_level; } int DBImpl::Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) { - return options_.level0_stop_writes_trigger; + mutex_.Lock(); + auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); + mutex_.Unlock(); + assert(cfd != nullptr); + return cfd->options()->level0_stop_writes_trigger; } uint64_t DBImpl::CurrentVersionNumber() const { @@ -1630,7 +1642,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, // For universal compaction, we enforce every manual compaction to compact // all files. if (begin == nullptr || - options_.compaction_style == kCompactionStyleUniversal) { + cfd->options()->compaction_style == kCompactionStyleUniversal) { manual.begin = nullptr; } else { begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek); @@ -2742,8 +2754,9 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, immutable_mems->AddIterators(options, &iterator_list); // Collect iterators for files in L0 - Ln version->AddIterators(options, storage_options_, &iterator_list); - Iterator* internal_iter = NewMergingIterator( - &internal_comparator_, &iterator_list[0], iterator_list.size()); + Iterator* internal_iter = + NewMergingIterator(&default_cfd_->internal_comparator(), + &iterator_list[0], iterator_list.size()); cleanup->version = version; cleanup->mu = &mutex_; cleanup->db = this; @@ -2799,8 +2812,8 @@ std::pair DBImpl::GetTailingIteratorPair( immutable_cleanup->db = this; immutable_cleanup->mu = &mutex_; - immutable_iter = - NewMergingIterator(&internal_comparator_, &list[0], list.size()); + immutable_iter = NewMergingIterator(&default_cfd_->internal_comparator(), + &list[0], list.size()); immutable_iter->RegisterCleanup(CleanupIteratorState, immutable_cleanup, nullptr); @@ -3500,7 +3513,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) { // (compression, etc) but err on the side of caution. lfile->SetPreallocationBlockSize(1.1 * cfd->options()->write_buffer_size); - memtmp = new MemTable(internal_comparator_, *cfd->options()); + memtmp = new MemTable(cfd->internal_comparator(), *cfd->options()); new_superversion = new SuperVersion(); } } @@ -3610,7 +3623,6 @@ Status DBImpl::DeleteFile(std::string name) { int level; FileMetaData metadata; - int maxlevel = NumberLevels(); ColumnFamilyData* cfd; VersionEdit edit; DeletionState deletion_state(0, true); @@ -3622,7 +3634,7 @@ Status DBImpl::DeleteFile(std::string name) { name.c_str()); return Status::InvalidArgument("File not found"); } - assert((level > 0) && (level < maxlevel)); + assert((level > 0) && (level < cfd->NumberLevels())); // If the file is being compacted no need to delete. if (metadata.being_compacted) { @@ -3634,7 +3646,7 @@ Status DBImpl::DeleteFile(std::string name) { // Only the files in the last level can be deleted externally. // This is to make sure that any deletion tombstones are not // lost. Check that the level passed is the last level. - for (int i = level + 1; i < maxlevel; i++) { + for (int i = level + 1; i < cfd->NumberLevels(); i++) { if (cfd->current()->NumLevelFiles(i) != 0) { Log(options_.info_log, "DeleteFile %s FAILED. File not in last level\n", name.c_str()); @@ -3820,13 +3832,20 @@ Status DB::OpenWithColumnFamilies( } } - if (s.ok() && impl->options_.compaction_style == kCompactionStyleUniversal) { - Version* current = impl->default_cfd_->current(); - for (int i = 1; i < impl->NumberLevels(); i++) { - int num_files = current->NumLevelFiles(i); - if (num_files > 0) { - s = Status::InvalidArgument("Not all files are at level 0. Cannot " - "open with universal compaction style."); + if (s.ok()) { + for (auto cfd : *impl->versions_->GetColumnFamilySet()) { + if (cfd->options()->compaction_style == kCompactionStyleUniversal) { + Version* current = cfd->current(); + for (int i = 1; i < current->NumberLevels(); ++i) { + int num_files = current->NumLevelFiles(i); + if (num_files > 0) { + s = Status::InvalidArgument("Not all files are at level 0. Cannot " + "open with universal compaction style."); + break; + } + } + } + if (!s.ok()) { break; } } diff --git a/db/db_test.cc b/db/db_test.cc index acf46478c..f0bf78e19 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5039,10 +5039,9 @@ void BM_LogAndApply(int iters, int num_base_files) { port::Mutex mu; MutexLock l(&mu); - InternalKeyComparator cmp(BytewiseComparator()); Options options; EnvOptions sopt; - VersionSet vset(dbname, &options, sopt, nullptr, &cmp); + VersionSet vset(dbname, &options, sopt, nullptr); std::vector dummy; dummy.push_back(ColumnFamilyDescriptor()); ASSERT_OK(vset.Recover(dummy)); diff --git a/db/version_set.cc b/db/version_set.cc index 46f528538..56e285015 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -417,7 +417,8 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset, next_(this), prev_(this), refs_(0), - num_levels_(vset->num_levels_), + // cfd is nullptr if Version is dummy + num_levels_(cfd == nullptr ? 0 : cfd->NumberLevels()), files_(new std::vector[num_levels_]), files_by_size_(num_levels_), next_file_to_compact_by_size_(num_levels_), @@ -1372,19 +1373,16 @@ class VersionSet::Builder { VersionSet::VersionSet(const std::string& dbname, const Options* options, const EnvOptions& storage_options, - TableCache* table_cache, - const InternalKeyComparator* cmp) + TableCache* table_cache) : column_family_set_(new ColumnFamilySet()), env_(options->env), dbname_(dbname), options_(options), table_cache_(table_cache), - icmp_(*cmp), next_file_number_(2), manifest_file_number_(0), // Filled by Recover() last_sequence_(0), prev_log_number_(0), - num_levels_(options_->num_levels), current_version_number_(0), manifest_file_size_(0), storage_options_(storage_options), @@ -1698,17 +1696,17 @@ Status VersionSet::Recover( if (!s.ok()) { break; } - if (edit.has_comparator_ && - edit.comparator_ != icmp_.user_comparator()->Name()) { - s = Status::InvalidArgument( - icmp_.user_comparator()->Name(), - "does not match existing comparator " + edit.comparator_); - break; - } + // Not found means that user didn't supply that column + // family option AND we encountered column family add + // record. Once we encounter column family drop record, + // we will delete the column family from + // column_families_not_found. bool cf_in_not_found = column_families_not_found.find(edit.column_family_) != column_families_not_found.end(); + // in builders means that user supplied that column family + // option AND that we encountered column family add record bool cf_in_builders = builders.find(edit.column_family_) != builders.end(); @@ -1729,6 +1727,13 @@ Status VersionSet::Recover( CreateColumnFamily(cf_options->second, &edit); builders.insert( {edit.column_family_, new Builder(new_cfd, new_cfd->current())}); + if (edit.has_comparator_ && + edit.comparator_ != new_cfd->user_comparator()->Name()) { + s = Status::InvalidArgument( + new_cfd->user_comparator()->Name(), + "does not match existing comparator " + edit.comparator_); + break; + } } } else if (edit.is_column_family_drop_) { if (cf_in_builders) { @@ -1764,6 +1769,13 @@ Status VersionSet::Recover( cfd->SetLogNumber(edit.log_number_); have_log_number = true; } + if (edit.has_comparator_ && + edit.comparator_ != cfd->user_comparator()->Name()) { + s = Status::InvalidArgument( + cfd->user_comparator()->Name(), + "does not match existing comparator " + edit.comparator_); + break; + } // if it is not column family add or column family drop, // then it's a file add/delete, which should be forwarded @@ -1924,9 +1936,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, "Number of levels needs to be bigger than 1"); } - const InternalKeyComparator cmp(options->comparator); TableCache tc(dbname, options, storage_options, 10); - VersionSet versions(dbname, options, storage_options, &tc, &cmp); + VersionSet versions(dbname, options, storage_options, &tc); Status status; std::vector dummy; @@ -2011,8 +2022,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, uint64_t prev_log_number = 0; int count = 0; // TODO works only for default column family currently - VersionSet::Builder builder(column_family_set_->GetDefault(), - column_family_set_->GetDefault()->current()); + ColumnFamilyData* default_cfd = column_family_set_->GetDefault(); + VersionSet::Builder builder(default_cfd, default_cfd->current()); { VersionSet::LogReporter reporter; @@ -2025,11 +2036,11 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, VersionEdit edit; s = edit.DecodeFrom(record); if (s.ok()) { - if (edit.has_comparator_ && - edit.comparator_ != icmp_.user_comparator()->Name()) { - s = Status::InvalidArgument(icmp_.user_comparator()->Name(), - "does not match existing comparator " + - edit.comparator_); + if (edit.column_family_ == 0 && edit.has_comparator_ && + edit.comparator_ != default_cfd->user_comparator()->Name()) { + s = Status::InvalidArgument( + default_cfd->user_comparator()->Name(), + "does not match existing comparator " + edit.comparator_); } } @@ -2127,12 +2138,14 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { // no need to explicitly write it edit.AddColumnFamily(cfd->GetName()); edit.SetColumnFamily(cfd->GetID()); - std::string record; - edit.EncodeTo(&record); - Status s = log->AddRecord(record); - if (!s.ok()) { - return s; - } + } + edit.SetComparatorName( + cfd->internal_comparator().user_comparator()->Name()); + std::string record; + edit.EncodeTo(&record); + Status s = log->AddRecord(record); + if (!s.ok()) { + return s; } } @@ -2141,7 +2154,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { VersionEdit edit; edit.SetColumnFamily(cfd->GetID()); - for (int level = 0; level < NumberLevels(); level++) { + for (int level = 0; level < cfd->NumberLevels(); level++) { for (const auto& f : cfd->current()->files_[level]) { edit.AddFile(level, f->number, @@ -2162,13 +2175,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { } } - // Save metadata - VersionEdit edit; - edit.SetComparatorName(icmp_.user_comparator()->Name()); - - std::string record; - edit.EncodeTo(&record); - return log->AddRecord(record); + return Status::OK(); } // Opens the mainfest file and reads all records @@ -2205,10 +2212,12 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { for (int level = 0; level < v->NumberLevels(); level++) { const std::vector& files = v->files_[level]; for (size_t i = 0; i < files.size(); i++) { - if (icmp_.Compare(files[i]->largest, ikey) <= 0) { + if (v->cfd_->internal_comparator().Compare(files[i]->largest, ikey) <= + 0) { // Entire file is before "ikey", so just add the file size result += files[i]->file_size; - } else if (icmp_.Compare(files[i]->smallest, ikey) > 0) { + } else if (v->cfd_->internal_comparator().Compare(files[i]->smallest, + ikey) > 0) { // Entire file is after "ikey", so ignore if (level > 0) { // Files other than level 0 are sorted by meta->smallest, so @@ -2368,7 +2377,7 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { for (auto cfd : *column_family_set_) { - for (int level = 0; level < NumberLevels(); level++) { + for (int level = 0; level < cfd->NumberLevels(); level++) { for (const auto& file : cfd->current()->files_[level]) { LiveFileMetaData filemetadata; filemetadata.name = TableFileName("", file->number); diff --git a/db/version_set.h b/db/version_set.h index 178787dcb..206370dc2 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -201,7 +201,7 @@ class Version { friend class Compaction; friend class VersionSet; friend class DBImpl; - friend struct ColumnFamilyData; + friend class ColumnFamilyData; friend class CompactionPicker; friend class LevelCompactionPicker; friend class UniversalCompactionPicker; @@ -232,7 +232,7 @@ class Version { // but files in each level are now sorted based on file // size. The file with the largest size is at the front. // This vector stores the index of the file from files_. - std::vector< std::vector > files_by_size_; + std::vector> files_by_size_; // An index into files_by_size_ that specifies the first // file that is not yet compacted @@ -281,8 +281,7 @@ class Version { class VersionSet { public: VersionSet(const std::string& dbname, const Options* options, - const EnvOptions& storage_options, TableCache* table_cache, - const InternalKeyComparator*); + const EnvOptions& storage_options, TableCache* table_cache); ~VersionSet(); // Apply *edit to the current version to form a new descriptor that @@ -361,8 +360,6 @@ class VersionSet { return min_log_num; } - int NumberLevels() const { return num_levels_; } - // Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed. Iterator* MakeInputIterator(Compaction* c); @@ -406,10 +403,7 @@ class VersionSet { class Builder; struct ManifestWriter; - friend class Compaction; friend class Version; - // TODO(icanadi) temporarily until we have what ColumnFamilyData needs (icmp_) - friend class ColumnFamilyData; struct LogReporter : public log::Reader::Reporter { Status* status; @@ -431,14 +425,11 @@ class VersionSet { const std::string dbname_; const Options* const options_; TableCache* const table_cache_; - const InternalKeyComparator icmp_; uint64_t next_file_number_; uint64_t manifest_file_number_; std::atomic last_sequence_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted - int num_levels_; - // Opened lazily unique_ptr descriptor_log_; diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 149b02c0d..5cc57f76b 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -536,10 +536,8 @@ void ManifestDumpCommand::DoCommand() { std::string file(manifestfile); std::string dbname("dummy"); TableCache* tc = new TableCache(dbname, &options, sopt, 10); - const InternalKeyComparator* cmp = - new InternalKeyComparator(options.comparator); - VersionSet* versions = new VersionSet(dbname, &options, sopt, tc, cmp); + VersionSet* versions = new VersionSet(dbname, &options, sopt, tc); Status s = versions->DumpManifest(options, file, verbose_, is_key_hex_); if (!s.ok()) { printf("Error in processing file %s %s\n", manifestfile.c_str(), @@ -1015,7 +1013,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, EnvOptions soptions; TableCache tc(db_path_, &opt, soptions, 10); const InternalKeyComparator cmp(opt.comparator); - VersionSet versions(db_path_, &opt, soptions, &tc, &cmp); + VersionSet versions(db_path_, &opt, soptions, &tc); std::vector dummy; ColumnFamilyDescriptor dummy_descriptor(default_column_family_name, ColumnFamilyOptions(opt)); @@ -1029,7 +1027,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, } int max = -1; auto default_cfd = versions.GetColumnFamilySet()->GetDefault(); - for (int i = 0; i < versions.NumberLevels(); i++) { + for (int i = 0; i < default_cfd->NumberLevels(); i++) { if (default_cfd->current()->NumLevelFiles(i)) { max = i; }