From 055e6df45b24204feb34461754a482ef7ffc14b6 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 14 Jan 2014 15:27:09 -0800 Subject: [PATCH] VersionEdit not to take NumLevels() Summary: I will submit a sequence of diffs that are preparing master branch for column families. There are a lot of implicit assumptions in the code that are making column family implementation hard. If I make the change only in column family branch, it will make merging back to master impossible. Most of the diffs will be simple code refactorings, so I hope we can have fast turnaround time. Feel free to grab me in person to discuss any of them. This diff removes number of level check from VersionEdit. It is used only when VersionEdit is read, not written, but has to be set when it is written. I believe it is a right thing to make VersionEdit dumb and check consistency on the caller side. This will also make it much easier to implement Column Families, since different column families can have different number of levels. Test Plan: make check Reviewers: dhruba, haobo, sdong, kailiu Reviewed By: kailiu CC: leveldb Differential Revision: https://reviews.facebook.net/D15159 --- db/db_impl.cc | 30 ++++++++---------- db/db_impl_readonly.cc | 2 +- db/db_test.cc | 9 +++--- db/memtable.cc | 16 ++++------ db/memtable.h | 7 ++--- db/repair.cc | 5 ++- db/version_edit.cc | 10 +++--- db/version_edit.h | 11 +++---- db/version_edit_test.cc | 4 +-- db/version_set.cc | 47 ++++++++++++++++------------- db/version_set_reduce_num_levels.cc | 4 +-- 11 files changed, 66 insertions(+), 79 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 37e8d7582..4781ad85d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -252,8 +252,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) : env_(options.env), dbname_(dbname), internal_comparator_(options.comparator), - options_(SanitizeOptions( - dbname, &internal_comparator_, &internal_filter_policy_, options)), + options_(SanitizeOptions(dbname, &internal_comparator_, + &internal_filter_policy_, options)), internal_filter_policy_(options.filter_policy), owns_info_log_(options_.info_log != options.info_log), db_lock_(nullptr), @@ -261,8 +261,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) shutting_down_(nullptr), bg_cv_(&mutex_), mem_rep_factory_(options_.memtable_factory.get()), - mem_(new MemTable(internal_comparator_, mem_rep_factory_, - NumberLevels(), options_)), + mem_(new MemTable(internal_comparator_, options_)), logfile_number_(0), super_version_(nullptr), tmp_batch_(), @@ -408,7 +407,7 @@ uint64_t DBImpl::TEST_Current_Manifest_FileNo() { } Status DBImpl::NewDB() { - VersionEdit new_db(NumberLevels()); + VersionEdit new_db; new_db.SetComparatorName(user_comparator()->Name()); new_db.SetLogNumber(0); new_db.SetNextFile(2); @@ -864,7 +863,7 @@ void DBImpl::PurgeObsoleteWALFiles() { // If externalTable is set, then apply recovered transactions // to that table. This is used for readonly mode. Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table, - bool error_if_log_file_exist) { + bool error_if_log_file_exist) { mutex_.AssertHeld(); assert(db_lock_ == nullptr); @@ -1031,8 +1030,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, WriteBatchInternal::SetContents(&batch, record); if (mem == nullptr) { - mem = new MemTable(internal_comparator_, mem_rep_factory_, - NumberLevels(), options_); + mem = new MemTable(internal_comparator_, options_); mem->Ref(); } status = WriteBatchInternal::InsertInto(&batch, mem, &options_); @@ -1358,7 +1356,7 @@ void DBImpl::ReFitLevel(int level, int target_level) { Log(options_.info_log, "Before refitting:\n%s", versions_->current()->DebugString().data()); - VersionEdit edit(NumberLevels()); + VersionEdit edit; for (const auto& f : versions_->current()->files_[level]) { edit.DeleteFile(level, f->number); edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest, @@ -3289,17 +3287,13 @@ Status DBImpl::MakeRoomForWrite(bool force, EnvOptions soptions(storage_options_); soptions.use_mmap_writes = false; DelayLoggingAndReset(); - s = env_->NewWritableFile( - LogFileName(options_.wal_dir, new_log_number), - &lfile, - soptions - ); + s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number), + &lfile, soptions); if (s.ok()) { // Our final size should be less than write_buffer_size // (compression, etc) but err on the side of caution. lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size); - memtmp = new MemTable( - internal_comparator_, mem_rep_factory_, NumberLevels(), options_); + memtmp = new MemTable(internal_comparator_, options_); new_superversion = new SuperVersion(options_.max_write_buffer_number); } } @@ -3680,7 +3674,7 @@ Status DBImpl::DeleteFile(std::string name) { int level; FileMetaData metadata; int maxlevel = NumberLevels(); - VersionEdit edit(maxlevel); + VersionEdit edit; DeletionState deletion_state(0, true); { MutexLock l(&mutex_); @@ -3802,7 +3796,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { return s; } impl->mutex_.Lock(); - VersionEdit edit(impl->NumberLevels()); + VersionEdit edit; s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index dbb297e93..04033b2fa 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -86,7 +86,7 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname, DBImplReadOnly* impl = new DBImplReadOnly(options, dbname); impl->mutex_.Lock(); - VersionEdit edit(impl->NumberLevels()); + VersionEdit edit; Status s = impl->Recover(&edit, impl->GetMemTable(), error_if_log_file_exist); impl->mutex_.Unlock(); diff --git a/db/db_test.cc b/db/db_test.cc index 560311ae3..2ff47320a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -765,10 +765,9 @@ TEST(DBTest, LevelLimitReopen) { options.num_levels = 1; options.max_bytes_for_level_multiplier_additional.resize(1, 1); Status s = TryReopen(&options); - ASSERT_EQ(s.IsCorruption(), true); + ASSERT_EQ(s.IsInvalidArgument(), true); ASSERT_EQ(s.ToString(), - "Corruption: VersionEdit: db already has " - "more levels than options.num_levels"); + "Invalid argument: db has more levels than options.num_levels"); options.num_levels = 10; options.max_bytes_for_level_multiplier_additional.resize(10, 1); @@ -4936,7 +4935,7 @@ void BM_LogAndApply(int iters, int num_base_files) { EnvOptions sopt; VersionSet vset(dbname, &options, sopt, nullptr, &cmp); ASSERT_OK(vset.Recover()); - VersionEdit vbase(vset.NumberLevels()); + VersionEdit vbase; uint64_t fnum = 1; for (int i = 0; i < num_base_files; i++) { InternalKey start(MakeKey(2*fnum), 1, kTypeValue); @@ -4948,7 +4947,7 @@ void BM_LogAndApply(int iters, int num_base_files) { uint64_t start_micros = env->NowMicros(); for (int i = 0; i < iters; i++) { - VersionEdit vedit(vset.NumberLevels()); + VersionEdit vedit; vedit.DeleteFile(2, fnum); InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); diff --git a/db/memtable.cc b/db/memtable.cc index 7881ce5bd..baff4fb34 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -33,24 +33,20 @@ struct hash { namespace rocksdb { -MemTable::MemTable(const InternalKeyComparator& cmp, - MemTableRepFactory* table_factory, - int numlevel, - const Options& options) +MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options) : comparator_(cmp), refs_(0), arena_impl_(options.arena_block_size), - table_(table_factory->CreateMemTableRep(comparator_, &arena_impl_)), + table_(options.memtable_factory->CreateMemTableRep(comparator_, + &arena_impl_)), flush_in_progress_(false), flush_completed_(false), file_number_(0), - edit_(numlevel), first_seqno_(0), mem_next_logfile_number_(0), mem_logfile_number_(0), - locks_(options.inplace_update_support - ? options.inplace_update_num_locks - : 0) { } + locks_(options.inplace_update_support ? options.inplace_update_num_locks + : 0) {} MemTable::~MemTable() { assert(refs_ == 0); @@ -58,7 +54,7 @@ MemTable::~MemTable() { size_t MemTable::ApproximateMemoryUsage() { return arena_impl_.ApproximateMemoryUsage() + - table_->ApproximateMemoryUsage(); + table_->ApproximateMemoryUsage(); } int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr) diff --git a/db/memtable.h b/db/memtable.h index 12ccf3d37..24a2c852b 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -34,11 +34,8 @@ class MemTable { // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. - explicit MemTable( - const InternalKeyComparator& comparator, - MemTableRepFactory* table_factory, - int numlevel = 7, - const Options& options = Options()); + explicit MemTable(const InternalKeyComparator& comparator, + const Options& options = Options()); ~MemTable(); diff --git a/db/repair.cc b/db/repair.cc index 6db90c865..29524233f 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -58,7 +58,7 @@ class Repairer { next_file_number_(1) { // TableCache can be small since we expect each table to be opened once. table_cache_ = new TableCache(dbname_, &options_, storage_options_, 10); - edit_ = new VersionEdit(options.num_levels); + edit_ = new VersionEdit(); } ~Repairer() { @@ -196,8 +196,7 @@ class Repairer { std::string scratch; Slice record; WriteBatch batch; - MemTable* mem = new MemTable(icmp_, options_.memtable_factory.get(), - options_.num_levels); + MemTable* mem = new MemTable(icmp_, options_); mem->Ref(); int counter = 0; while (reader.ReadRecord(&record, &scratch)) { diff --git a/db/version_edit.cc b/db/version_edit.cc index 9f23faba7..42c07e7b0 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -33,6 +33,7 @@ enum Tag { void VersionEdit::Clear() { comparator_.clear(); + max_level_ = 0; log_number_ = 0; prev_log_number_ = 0; last_sequence_ = 0; @@ -107,14 +108,13 @@ static bool GetInternalKey(Slice* input, InternalKey* dst) { bool VersionEdit::GetLevel(Slice* input, int* level, const char** msg) { uint32_t v; - if (GetVarint32(input, &v) && - (int)v < number_levels_) { + if (GetVarint32(input, &v)) { *level = v; + if (max_level_ < *level) { + max_level_ = *level; + } return true; } else { - if ((int)v >= number_levels_) { - *msg = "db already has more levels than options.num_levels"; - } return false; } } diff --git a/db/version_edit.h b/db/version_edit.h index 196914e2b..a0546c983 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -34,10 +34,7 @@ struct FileMetaData { class VersionEdit { public: - explicit VersionEdit(int number_levels) : - number_levels_(number_levels) { - Clear(); - } + VersionEdit() { Clear(); } ~VersionEdit() { } void Clear(); @@ -108,7 +105,7 @@ class VersionEdit { bool GetLevel(Slice* input, int* level, const char** msg); - int number_levels_; + int max_level_; std::string comparator_; uint64_t log_number_; uint64_t prev_log_number_; @@ -120,9 +117,9 @@ class VersionEdit { bool has_next_file_number_; bool has_last_sequence_; - std::vector< std::pair > compact_pointers_; + std::vector > compact_pointers_; DeletedFileSet deleted_files_; - std::vector< std::pair > new_files_; + std::vector > new_files_; }; } // namespace rocksdb diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 4a00822f7..745ea90d0 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -15,7 +15,7 @@ namespace rocksdb { static void TestEncodeDecode(const VersionEdit& edit) { std::string encoded, encoded2; edit.EncodeTo(&encoded); - VersionEdit parsed(7); + VersionEdit parsed(); Status s = parsed.DecodeFrom(encoded); ASSERT_TRUE(s.ok()) << s.ToString(); parsed.EncodeTo(&encoded2); @@ -27,7 +27,7 @@ class VersionEditTest { }; TEST(VersionEditTest, EncodeDecode) { static const uint64_t kBig = 1ull << 50; - VersionEdit edit(7); + VersionEdit edit(); for (int i = 0; i < 4; i++) { TestEncodeDecode(edit); edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, diff --git a/db/version_set.cc b/db/version_set.cc index 7a1f5cbf8..91b3dcd3f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -980,14 +980,12 @@ class VersionSet::Builder { #endif } - void CheckConsistencyForDeletes( - VersionEdit* edit, - unsigned int number, - int level) { + void CheckConsistencyForDeletes(VersionEdit* edit, unsigned int number, + int level) { #ifndef NDEBUG // a file to be deleted better exist in the previous version bool found = false; - for (int l = 0; !found && l < edit->number_levels_; l++) { + for (int l = 0; !found && l < vset_->NumberLevels(); l++) { const std::vector& base_files = base_->files_[l]; for (unsigned int i = 0; i < base_files.size(); i++) { FileMetaData* f = base_files[i]; @@ -1000,7 +998,7 @@ class VersionSet::Builder { // if the file did not exist in the previous version, then it // is possibly moved from lower level to higher level in current // version - for (int l = level+1; !found && l < edit->number_levels_; l++) { + for (int l = level+1; !found && l < vset_->NumberLevels(); l++) { const FileSet* added = levels_[l].added_files; for (FileSet::const_iterator added_iter = added->begin(); added_iter != added->end(); ++added_iter) { @@ -1213,7 +1211,7 @@ void VersionSet::AppendVersion(Version* v) { } Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, - bool new_descriptor_log) { + bool new_descriptor_log) { mu->AssertHeld(); // queue our request @@ -1383,7 +1381,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, } void VersionSet::LogAndApplyHelper(Builder* builder, Version* v, - VersionEdit* edit, port::Mutex* mu) { + VersionEdit* edit, port::Mutex* mu) { mu->AssertHeld(); if (edit->has_log_number_) { @@ -1455,21 +1453,28 @@ Status VersionSet::Recover() { Slice record; std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { - VersionEdit edit(NumberLevels()); + VersionEdit edit; s = edit.DecodeFrom(record); - if (s.ok()) { - if (edit.has_comparator_ && - edit.comparator_ != icmp_.user_comparator()->Name()) { - s = Status::InvalidArgument(icmp_.user_comparator()->Name(), - "does not match existing comparator " + - edit.comparator_); - } + if (!s.ok()) { + break; } - if (s.ok()) { - builder.Apply(&edit); + if (edit.max_level_ >= NumberLevels()) { + s = Status::InvalidArgument( + "db has more levels than options.num_levels"); + break; } + if (edit.has_comparator_ && + edit.comparator_ != icmp_.user_comparator()->Name()) { + s = Status::InvalidArgument(icmp_.user_comparator()->Name(), + "does not match existing comparator " + + edit.comparator_); + break; + } + + builder.Apply(&edit); + if (edit.has_log_number_) { log_number = edit.log_number_; have_log_number = true; @@ -1577,7 +1582,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, Slice record; std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { - VersionEdit edit(NumberLevels()); + VersionEdit edit; s = edit.DecodeFrom(record); if (s.ok()) { if (edit.has_comparator_ && @@ -1832,7 +1837,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { // TODO: Break up into multiple records to reduce memory usage on recovery? // Save metadata - VersionEdit edit(NumberLevels()); + VersionEdit edit; edit.SetComparatorName(icmp_.user_comparator()->Name()); // Save compaction pointers @@ -2994,7 +2999,7 @@ Compaction::Compaction(int level, int out_level, uint64_t target_file_size, bottommost_level_(false), is_full_compaction_(false), level_ptrs_(std::vector(number_levels)) { - edit_ = new VersionEdit(number_levels_); + edit_ = new VersionEdit(); for (int i = 0; i < number_levels_; i++) { level_ptrs_[i] = 0; } diff --git a/db/version_set_reduce_num_levels.cc b/db/version_set_reduce_num_levels.cc index d13a4aed9..07062399b 100644 --- a/db/version_set_reduce_num_levels.cc +++ b/db/version_set_reduce_num_levels.cc @@ -72,8 +72,8 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) { num_levels_ = new_levels; compact_pointer_ = new std::string[new_levels]; Init(new_levels); - VersionEdit ve(new_levels); - st = LogAndApply(&ve , mu, true); + VersionEdit ve; + st = LogAndApply(&ve, mu, true); return st; }