From fb2346fc1f700a255f31427ddbbbf2e5759a9ad4 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 11 Mar 2014 14:52:17 -0700 Subject: [PATCH] [CF] Code cleanup part 1 Summary: I'm cleaning up some code preparing for the big diff review tomorrow. This is the first part of the cleanup. Changes are mostly cosmetic. The goal is to decrease amount of code difference between columnfamilies and master branch. This diff also fixes race condition when dropping column family. Test Plan: Ran db_stress with variety of parameters Reviewers: dhruba, haobo Differential Revision: https://reviews.facebook.net/D16833 --- HISTORY.md | 3 + db/column_family.cc | 124 +++++++++++++++----------------- db/column_family.h | 131 +++++++++++++++++++++------------- db/column_family_test.cc | 2 +- db/compaction_picker.cc | 16 +++-- db/compaction_picker.h | 18 +++-- db/db_impl.cc | 146 +++++++++++++++----------------------- db/db_impl_readonly.cc | 7 +- db/memtable.cc | 2 +- db/memtable.h | 2 +- db/repair.cc | 1 - db/version_set.cc | 37 +++++----- db/write_batch.cc | 6 +- db/write_batch_internal.h | 4 +- db/write_batch_test.cc | 2 +- table/table_test.cc | 3 +- 16 files changed, 249 insertions(+), 255 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 3d6384d6e..b35ef0127 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,9 @@ * By default, max_background_flushes is 1 and flush process is removed from background compaction process. Flush process is now always executed in high priority thread pool. +* Column family support +* If you write something to the non-default column family with disableWAL = true, + you need to Flush the column family before exiting if you want your data to be persistent ## Unreleased (will be relased in 2.8) diff --git a/db/column_family.cc b/db/column_family.cc index 3a5fa067b..17539c695 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -185,9 +185,8 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, dropped_(false), internal_comparator_(options.comparator), internal_filter_policy_(options.filter_policy), - options_(SanitizeOptions(&internal_comparator_, &internal_filter_policy_, - options)), - full_options_(*db_options, options_), + options_(*db_options, SanitizeOptions(&internal_comparator_, + &internal_filter_policy_, options)), mem_(nullptr), imm_(options.min_write_buffer_number_to_merge), super_version_(nullptr), @@ -205,18 +204,19 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, internal_stats_.reset(new InternalStats(options.num_levels, db_options->env, db_options->statistics.get())); table_cache_.reset( - new TableCache(dbname, &full_options_, storage_options, table_cache)); + new TableCache(dbname, &options_, storage_options, table_cache)); if (options_.compaction_style == kCompactionStyleUniversal) { - compaction_picker_.reset(new UniversalCompactionPicker( - &options_, &internal_comparator_, db_options->info_log.get())); + compaction_picker_.reset( + new UniversalCompactionPicker(&options_, &internal_comparator_)); } else { - compaction_picker_.reset(new LevelCompactionPicker( - &options_, &internal_comparator_, db_options->info_log.get())); + compaction_picker_.reset( + new LevelCompactionPicker(&options_, &internal_comparator_)); } - Log(full_options_.info_log, "Options for column family \"%s\":\n", + Log(options_.info_log, "Options for column family \"%s\":\n", name.c_str()); - options_.Dump(full_options_.info_log.get()); + const ColumnFamilyOptions* cf_options = &options_; + cf_options->Dump(options_.info_log.get()); } } @@ -232,14 +232,27 @@ ColumnFamilyData::~ColumnFamilyData() { // it's nullptr for dummy CFD if (column_family_set_ != nullptr) { // remove from column_family_set - column_family_set_->DropColumnFamily(this); + column_family_set_->RemoveColumnFamily(this); } if (current_ != nullptr) { current_->Unref(); } - DeleteSuperVersion(); + if (super_version_ != nullptr) { + // Release SuperVersion reference kept in ThreadLocalPtr. + // This must be done outside of mutex_ since unref handler can lock mutex. + super_version_->db_mutex->Unlock(); + local_sv_.reset(); + super_version_->db_mutex->Lock(); + + bool is_last_reference __attribute__((unused)); + is_last_reference = super_version_->Unref(); + assert(is_last_reference); + super_version_->Cleanup(); + delete super_version_; + super_version_ = nullptr; + } if (dummy_versions_ != nullptr) { // List must be empty @@ -257,10 +270,6 @@ ColumnFamilyData::~ColumnFamilyData() { } } -InternalStats* ColumnFamilyData::internal_stats() { - return internal_stats_.get(); -} - void ColumnFamilyData::SetCurrent(Version* current) { current_ = current; need_slowdown_for_num_level0_files_ = @@ -320,23 +329,6 @@ void ColumnFamilyData::ResetThreadLocalSuperVersions() { } } -void ColumnFamilyData::DeleteSuperVersion() { - if (super_version_ != nullptr) { - // Release SuperVersion reference kept in ThreadLocalPtr. - // This must be done outside of mutex_ since unref handler can lock mutex. - super_version_->db_mutex->Unlock(); - local_sv_.reset(); - super_version_->db_mutex->Lock(); - - bool is_last_reference __attribute__((unused)); - is_last_reference = super_version_->Unref(); - assert(is_last_reference); - super_version_->Cleanup(); - delete super_version_; - super_version_ = nullptr; - } -} - ColumnFamilySet::ColumnFamilySet(const std::string& dbname, const DBOptions* db_options, const EnvOptions& storage_options, @@ -345,6 +337,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, dummy_cfd_(new ColumnFamilyData(dbname, 0, "", nullptr, nullptr, ColumnFamilyOptions(), db_options, storage_options_, nullptr)), + default_cfd_cache_(nullptr), db_name_(dbname), db_options_(db_options), storage_options_(storage_options), @@ -367,10 +360,8 @@ ColumnFamilySet::~ColumnFamilySet() { } ColumnFamilyData* ColumnFamilySet::GetDefault() const { - auto cfd = GetColumnFamily(0); - // default column family should always exist - assert(cfd != nullptr); - return cfd; + assert(default_cfd_cache_ != nullptr); + return default_cfd_cache_; } ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const { @@ -385,24 +376,13 @@ ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const { ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name) const { auto cfd_iter = column_families_.find(name); - if (cfd_iter == column_families_.end()) { + if (cfd_iter != column_families_.end()) { + auto cfd = GetColumnFamily(cfd_iter->second); + assert(cfd != nullptr); + return cfd; + } else { return nullptr; } - return GetColumnFamily(cfd_iter->second); -} - -bool ColumnFamilySet::Exists(uint32_t id) { - return column_family_data_.find(id) != column_family_data_.end(); -} - -bool ColumnFamilySet::Exists(const std::string& name) { - return column_families_.find(name) != column_families_.end(); -} - -uint32_t ColumnFamilySet::GetID(const std::string& name) { - auto cfd_iter = column_families_.find(name); - assert(cfd_iter != column_families_.end()); - return cfd_iter->second; } uint32_t ColumnFamilySet::GetNextColumnFamilyID() { @@ -434,11 +414,22 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( new_cfd->prev_ = prev; prev->next_ = new_cfd; dummy_cfd_->prev_ = new_cfd; + if (id == 0) { + default_cfd_cache_ = new_cfd; + } return new_cfd; } +void ColumnFamilySet::Lock() { + // spin lock + while (spin_lock_.test_and_set(std::memory_order_acquire)) { + } +} + +void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); } + // under a DB mutex -void ColumnFamilySet::DropColumnFamily(ColumnFamilyData* cfd) { +void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) { auto cfd_iter = column_family_data_.find(cfd->GetID()); assert(cfd_iter != column_family_data_.end()); Lock(); @@ -447,19 +438,16 @@ void ColumnFamilySet::DropColumnFamily(ColumnFamilyData* cfd) { Unlock(); } -void ColumnFamilySet::Lock() { - // spin lock - while (spin_lock_.test_and_set(std::memory_order_acquire)) { - } -} - -void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); } - bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) { - // maybe outside of db mutex, should lock - column_family_set_->Lock(); - current_ = column_family_set_->GetColumnFamily(column_family_id); - column_family_set_->Unlock(); + if (column_family_id == 0) { + // optimization for common case + current_ = column_family_set_->GetDefault(); + } else { + // maybe outside of db mutex, should lock + column_family_set_->Lock(); + current_ = column_family_set_->GetColumnFamily(column_family_id); + column_family_set_->Unlock(); + } handle_.SetCFD(current_); return current_ != nullptr; } @@ -474,9 +462,9 @@ MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const { return current_->mem(); } -const Options* ColumnFamilyMemTablesImpl::GetFullOptions() const { +const Options* ColumnFamilyMemTablesImpl::GetOptions() const { assert(current_ != nullptr); - return current_->full_options(); + return current_->options(); } ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() { diff --git a/db/column_family.h b/db/column_family.h index 2ff2e02ab..50047be23 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -15,6 +15,7 @@ #include #include "rocksdb/options.h" +#include "rocksdb/db.h" #include "rocksdb/env.h" #include "db/memtable_list.h" #include "db/write_batch_internal.h" @@ -35,6 +36,9 @@ class ColumnFamilyData; class DBImpl; class LogBuffer; +// ColumnFamilyHandleImpl is the class that clients use to access different +// column families. It has non-trivial destructor, which gets called when client +// is done using the column family class ColumnFamilyHandleImpl : public ColumnFamilyHandle { public: // create while holding the mutex @@ -51,7 +55,12 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle { port::Mutex* mutex_; }; -// does not ref-count cfd_ +// Does not ref-count ColumnFamilyData +// We use this dummy ColumnFamilyHandleImpl because sometimes MemTableInserter +// calls DBImpl methods. When this happens, MemTableInserter need access to +// ColumnFamilyHandle (same as the client would need). In that case, we feed +// MemTableInserter dummy ColumnFamilyHandle and enable it to call DBImpl +// methods class ColumnFamilyHandleInternal : public ColumnFamilyHandleImpl { public: ColumnFamilyHandleInternal() @@ -110,15 +119,18 @@ extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, class ColumnFamilySet; -// column family metadata. not thread-safe. should be protected by db_mutex +// This class keeps all the data that a column family needs. It's mosly dumb and +// used just to provide access to metadata. +// Most methods require DB mutex held, unless otherwise noted class ColumnFamilyData { public: ~ColumnFamilyData(); + // thread-safe uint32_t GetID() const { return id_; } - const std::string& GetName() { return name_; } + // thread-safe + const std::string& GetName() const { return name_; } - // DB mutex held for all these void Ref() { ++refs_; } // will just decrease reference count to 0, but will not delete it. returns // true if the ref count was decreased to zero and needs to be cleaned up by @@ -127,24 +139,37 @@ class ColumnFamilyData { assert(refs_ > 0); return --refs_ == 0; } - bool Dead() { return refs_ == 0; } - // SetDropped() and IsDropped() are thread-safe + // This can only be called from single-threaded VersionSet::LogAndApply() + // After dropping column family no other operation on that column family + // will be executed. All the files and memory will be, however, kept around + // until client drops the column family handle. That way, client can still + // access data from dropped column family. + // Column family can be dropped and still alive. In that state: + // *) Column family is not included in the iteration. + // *) Compaction and flush is not executed on the dropped column family. + // *) Client can continue writing and reading from column family. However, all + // writes stay in the current memtable. + // When the dropped column family is unreferenced, then we: + // *) delete all memory associated with that column family + // *) delete all the files associated with that column family void SetDropped() { // can't drop default CF assert(id_ != 0); - dropped_.store(true); + dropped_ = true; } - bool IsDropped() const { return dropped_.load(); } + bool IsDropped() const { return dropped_; } + // thread-safe int NumberLevels() const { return options_.num_levels; } void SetLogNumber(uint64_t log_number) { log_number_ = log_number; } uint64_t GetLogNumber() const { return log_number_; } - const ColumnFamilyOptions* options() const { return &options_; } - const Options* full_options() const { return &full_options_; } - InternalStats* internal_stats(); + // thread-safe + const Options* options() const { return &options_; } + + InternalStats* internal_stats() { return internal_stats_.get(); } MemTableList* imm() { return &imm_; } MemTable* mem() { return mem_; } @@ -154,7 +179,7 @@ class ColumnFamilyData { void SetCurrent(Version* current); void CreateNewMemtable(); - TableCache* table_cache() const { return table_cache_.get(); } + TableCache* table_cache() { return table_cache_.get(); } // See documentation in compaction_picker.h Compaction* PickCompaction(LogBuffer* log_buffer); @@ -162,18 +187,20 @@ class ColumnFamilyData { const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end); - CompactionPicker* compaction_picker() const { - return compaction_picker_.get(); - } + CompactionPicker* compaction_picker() { return compaction_picker_.get(); } + // thread-safe const Comparator* user_comparator() const { return internal_comparator_.user_comparator(); } + // thread-safe const InternalKeyComparator& internal_comparator() const { return internal_comparator_; } - SuperVersion* GetSuperVersion() const { return super_version_; } + SuperVersion* GetSuperVersion() { return super_version_; } + // thread-safe ThreadLocalPtr* GetThreadLocalSuperVersion() const { return local_sv_.get(); } + // thread-safe uint64_t GetSuperVersionNumber() const { return super_version_number_.load(); } @@ -185,9 +212,6 @@ class ColumnFamilyData { port::Mutex* db_mutex); void ResetThreadLocalSuperVersions(); - // REQUIRED: db mutex held - // Do not access column family after calling this method - void DeleteSuperVersion(); // A Flag indicating whether write needs to slowdown because of there are // too many number of level0 files. @@ -204,21 +228,18 @@ class ColumnFamilyData { const EnvOptions& storage_options, ColumnFamilySet* column_family_set); - ColumnFamilyData* next() { return next_; } - uint32_t id_; const std::string name_; Version* dummy_versions_; // Head of circular doubly-linked list of versions. Version* current_; // == dummy_versions->prev_ int refs_; // outstanding references to ColumnFamilyData - std::atomic dropped_; // true if client dropped it + bool dropped_; // true if client dropped it const InternalKeyComparator internal_comparator_; const InternalFilterPolicy internal_filter_policy_; - ColumnFamilyOptions const options_; - Options const full_options_; + Options const options_; std::unique_ptr table_cache_; @@ -258,19 +279,33 @@ class ColumnFamilyData { ColumnFamilySet* column_family_set_; }; -// Thread safe only for reading without a writer. All access should be -// locked when adding or dropping column family +// ColumnFamilySet has interesting thread-safety requirements +// * CreateColumnFamily() or RemoveColumnFamily() -- need to protect by DB +// mutex. Inside, column_family_data_ and column_families_ will be protected +// by Lock() and Unlock(). CreateColumnFamily() should ONLY be called from +// VersionSet::LogAndApply() in the normal runtime. It is also called +// during Recovery and in DumpManifest(). RemoveColumnFamily() is called +// from ColumnFamilyData destructor +// * Iteration -- hold DB mutex, but you can release it in the body of +// iteration. If you release DB mutex in body, reference the column +// family before the mutex and unreference after you unlock, since the column +// family might get dropped when the DB mutex is released +// * GetDefault() -- thread safe +// * GetColumnFamily() -- either inside of DB mutex or call Lock() <-> Unlock() +// * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily() -- +// inside of DB mutex class ColumnFamilySet { public: + // ColumnFamilySet supports iteration class iterator { public: explicit iterator(ColumnFamilyData* cfd) : current_(cfd) {} iterator& operator++() { - // dummy is never dead, so this will never be infinite + // dummy is never dead or dropped, so this will never be infinite do { - current_ = current_->next(); - } while (current_->Dead()); + current_ = current_->next_; + } while (current_->refs_ == 0 || current_->IsDropped()); return *this; } bool operator!=(const iterator& other) { @@ -290,9 +325,6 @@ class ColumnFamilySet { // GetColumnFamily() calls return nullptr if column family is not found ColumnFamilyData* GetColumnFamily(uint32_t id) const; ColumnFamilyData* GetColumnFamily(const std::string& name) const; - bool Exists(uint32_t id); - bool Exists(const std::string& name); - uint32_t GetID(const std::string& name); // this call will return the next available column family ID. it guarantees // that there is no column family with id greater than or equal to the // returned value in the current running instance or anytime in RocksDB @@ -304,34 +336,33 @@ class ColumnFamilySet { ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id, Version* dummy_version, const ColumnFamilyOptions& options); - void DropColumnFamily(ColumnFamilyData* cfd); - iterator begin() { return iterator(dummy_cfd_->next()); } + iterator begin() { return iterator(dummy_cfd_->next_); } iterator end() { return iterator(dummy_cfd_); } - // ColumnFamilySet has interesting thread-safety requirements - // * CreateColumnFamily() or DropColumnFamily() -- need to protect by DB - // mutex. Inside, column_family_data_ and column_families_ will be protected - // by Lock() and Unlock() - // * Iterate -- hold DB mutex, but you can release it in the body of - // iteration. If you release DB mutex in body, reference the column - // family before the mutex and unreference after you unlock, since the column - // family might get dropped when you release the DB mutex. - // * GetDefault(), GetColumnFamily(), Exists(), GetID() -- either inside of DB - // mutex or call Lock() - // * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily() -- - // inside of DB mutex void Lock(); void Unlock(); private: - // when mutating: 1. DB mutex locked first, 2. spinlock locked second - // when reading, either: 1. lock DB mutex, or 2. lock spinlock + friend class ColumnFamilyData; + // helper function that gets called from cfd destructor + // REQUIRES: DB mutex held + void RemoveColumnFamily(ColumnFamilyData* cfd); + + // column_families_ and column_family_data_ need to be protected: + // * when mutating: 1. DB mutex locked first, 2. spinlock locked second + // * when reading, either: 1. lock DB mutex, or 2. lock spinlock // (if both, respect the ordering to avoid deadlock!) std::unordered_map column_families_; std::unordered_map column_family_data_; + uint32_t max_column_family_; ColumnFamilyData* dummy_cfd_; + // We don't hold the refcount here, since default column family always exists + // We are also not responsible for cleaning up default_cfd_cache_. This is + // just a cache that makes common case (accessing default column family) + // faster + ColumnFamilyData* default_cfd_cache_; const std::string db_name_; const DBOptions* const db_options_; @@ -340,6 +371,8 @@ class ColumnFamilySet { std::atomic_flag spin_lock_; }; +// We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access +// memtables of different column families (specified by ID in the write batch) class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { public: explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set) @@ -357,7 +390,7 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { // Returns options for selected column family // REQUIRES: Seek() called first - virtual const Options* GetFullOptions() const override; + virtual const Options* GetOptions() const override; // Returns column family handle for the selected column family virtual ColumnFamilyHandle* GetColumnFamilyHandle() override; diff --git a/db/column_family_test.cc b/db/column_family_test.cc index a3b075242..11b36eec4 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -797,7 +797,7 @@ std::string IterStatus(Iterator* iter) { } return result; } -} // namespace anonymous +} // anonymous namespace TEST(ColumnFamilyTest, NewIteratorsTest) { // iter == 0 -- no tailing diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 531426a7a..52499f301 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -42,11 +42,9 @@ uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) { } // anonymous namespace -CompactionPicker::CompactionPicker(const ColumnFamilyOptions* options, - const InternalKeyComparator* icmp, - Logger* logger) +CompactionPicker::CompactionPicker(const Options* options, + const InternalKeyComparator* icmp) : compactions_in_progress_(options->num_levels), - logger_(logger), options_(options), num_levels_(options->num_levels), icmp_(icmp) { @@ -272,7 +270,7 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) { &c->parent_index_); if (expanded1.size() == c->inputs_[1].size() && !FilesInCompaction(expanded1)) { - Log(logger_, + Log(options_->info_log, "Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu bytes)" "\n", (unsigned long)level, (unsigned long)(c->inputs_[0].size()), @@ -343,7 +341,7 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, c->inputs_[0] = inputs; if (ExpandWhileOverlapping(c) == false) { delete c; - Log(logger_, "Could not compact due to expansion failure.\n"); + Log(options_->info_log, "Could not compact due to expansion failure.\n"); return nullptr; } @@ -514,7 +512,7 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, } //if (i > Version::number_of_files_to_sort_) { - // Log(logger_, "XXX Looking at index %d", i); + // Log(options_->info_log, "XXX Looking at index %d", i); //} // Do not pick this file if its parents at level+1 are being compacted. @@ -610,6 +608,10 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version, c->bottommost_level_ = true; } + // update statistics + MeasureTime(options_->statistics.get(), NUM_FILES_IN_SINGLE_COMPACTION, + c->inputs_[0].size()); + // mark all the files that are being compacted c->MarkFilesBeingCompacted(true); diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 164d23b11..6527ef967 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -26,8 +26,7 @@ class Version; class CompactionPicker { public: - CompactionPicker(const ColumnFamilyOptions* options, - const InternalKeyComparator* icmp, Logger* logger); + CompactionPicker(const Options* options, const InternalKeyComparator* icmp); virtual ~CompactionPicker(); // Pick level and inputs for a new compaction. @@ -119,8 +118,7 @@ class CompactionPicker { // Per-level max bytes std::unique_ptr level_max_bytes_; - Logger* logger_; - const ColumnFamilyOptions* const options_; + const Options* const options_; private: int num_levels_; @@ -130,9 +128,9 @@ class CompactionPicker { class UniversalCompactionPicker : public CompactionPicker { public: - UniversalCompactionPicker(const ColumnFamilyOptions* options, - const InternalKeyComparator* icmp, Logger* logger) - : CompactionPicker(options, icmp, logger) {} + UniversalCompactionPicker(const Options* options, + const InternalKeyComparator* icmp) + : CompactionPicker(options, icmp) {} virtual Compaction* PickCompaction(Version* version, LogBuffer* log_buffer) override; @@ -150,9 +148,9 @@ class UniversalCompactionPicker : public CompactionPicker { class LevelCompactionPicker : public CompactionPicker { public: - LevelCompactionPicker(const ColumnFamilyOptions* options, - const InternalKeyComparator* icmp, Logger* logger) - : CompactionPicker(options, icmp, logger) {} + LevelCompactionPicker(const Options* options, + const InternalKeyComparator* icmp) + : CompactionPicker(options, icmp) {} virtual Compaction* PickCompaction(Version* version, LogBuffer* log_buffer) override; diff --git a/db/db_impl.cc b/db/db_impl.cc index 1632d0493..437496cd0 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -276,26 +276,17 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) } DBImpl::~DBImpl() { - // Wait for background work to finish - mutex_.Lock(); - if (flush_on_destroy_) { - autovector to_delete; - for (auto cfd : *versions_->GetColumnFamilySet()) { - // TODO(icanadi) do this in ColumnFamilyData destructor - if (!cfd->IsDropped() && cfd->mem()->GetFirstSequenceNumber() != 0) { - cfd->Ref(); - mutex_.Unlock(); - FlushMemTable(cfd, FlushOptions()); - mutex_.Lock(); - if (cfd->Unref()) { - to_delete.push_back(cfd); - } - } - } - for (auto cfd : to_delete) { - delete cfd; + // only the default CFD is alive at this point + if (default_cf_handle_ != nullptr) { + auto default_cfd = default_cf_handle_->cfd(); + if (flush_on_destroy_ && + default_cfd->mem()->GetFirstSequenceNumber() != 0) { + FlushMemTable(default_cfd, FlushOptions()); } } + + mutex_.Lock(); + // Wait for background work to finish shutting_down_.Release_Store(this); // Any non-nullptr value is ok while (bg_compaction_scheduled_ || bg_flush_scheduled_ || @@ -303,8 +294,11 @@ DBImpl::~DBImpl() { bg_cv_.Wait(); } - for (auto cfd : *versions_->GetColumnFamilySet()) { - cfd->DeleteSuperVersion(); + if (default_cf_handle_ != nullptr) { + // we need to delete handle outside of lock because it does its own locking + mutex_.Unlock(); + delete default_cf_handle_; + mutex_.Lock(); } if (options_.allow_thread_local) { @@ -328,21 +322,13 @@ DBImpl::~DBImpl() { } } - mutex_.Unlock(); - if (default_cf_handle_ != nullptr) { - // we need to delete handle outside of lock because it does its own locking - delete default_cf_handle_; - } - - if (db_lock_ != nullptr) { - env_->UnlockFile(db_lock_); - } - - mutex_.Lock(); // versions need to be destroyed before table_cache since it can hold // references to table_cache. versions_.reset(); mutex_.Unlock(); + if (db_lock_ != nullptr) { + env_->UnlockFile(db_lock_); + } LogFlush(options_.info_log); } @@ -876,14 +862,8 @@ Status DBImpl::Recover( versions_->MarkFileNumberUsed(log); s = RecoverLogFile(log, &max_sequence, read_only); } - - if (s.ok()) { - if (versions_->LastSequence() < max_sequence) { - versions_->SetLastSequence(max_sequence); - } - SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER, - versions_->LastSequence()); - } + SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER, + versions_->LastSequence()); } return s; @@ -1029,9 +1009,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, // we must mark the next log number as used, even though it's // not actually used. that is because VersionSet assumes // VersionSet::next_file_number_ always to be strictly greater than any - // log - // number + // log number versions_->MarkFileNumberUsed(log_number + 1); + if (versions_->LastSequence() < *max_sequence) { + versions_->SetLastSequence(*max_sequence); + } status = versions_->LogAndApply(cfd, edit, &mutex_); if (!status.ok()) { return status; @@ -1059,10 +1041,10 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, Status s; { mutex_.Unlock(); - s = BuildTable(dbname_, env_, *cfd->full_options(), storage_options_, + s = BuildTable(dbname_, env_, *cfd->options(), storage_options_, cfd->table_cache(), iter, &meta, cfd->internal_comparator(), newest_snapshot, earliest_seqno_in_memtable, - GetCompressionFlush(*cfd->full_options())); + GetCompressionFlush(*cfd->options())); LogFlush(options_.info_log); mutex_.Lock(); } @@ -1124,10 +1106,10 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, Log(options_.info_log, "Level-0 flush table #%lu: started", (unsigned long)meta.number); - s = BuildTable(dbname_, env_, *cfd->full_options(), storage_options_, + s = BuildTable(dbname_, env_, *cfd->options(), storage_options_, cfd->table_cache(), iter, &meta, cfd->internal_comparator(), newest_snapshot, earliest_seqno_in_memtable, - GetCompressionFlush(*cfd->full_options())); + GetCompressionFlush(*cfd->options())); LogFlush(options_.info_log); delete iter; Log(options_.info_log, "Level-0 flush table #%lu: %lu bytes %s", @@ -1758,7 +1740,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { bool is_flush_pending = false; // no need to refcount since we're under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { - if (!cfd->IsDropped() && cfd->imm()->IsFlushPending()) { + if (cfd->imm()->IsFlushPending()) { is_flush_pending = true; } } @@ -1775,7 +1757,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { bool is_compaction_needed = false; // no need to refcount since we're under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { - if (!cfd->IsDropped() && cfd->current()->NeedsCompaction()) { + if (cfd->current()->NeedsCompaction()) { is_compaction_needed = true; break; } @@ -1813,9 +1795,6 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, autovector to_delete; // refcounting in iteration for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->IsDropped()) { - continue; - } cfd->Ref(); Status flush_status; while (flush_status.ok() && cfd->imm()->IsFlushPending()) { @@ -1988,7 +1967,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } else { // no need to refcount in iteration since it's always under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { - if (!cfd->options()->disable_auto_compactions && !cfd->IsDropped()) { + if (!cfd->options()->disable_auto_compactions) { c.reset(cfd->PickCompaction(log_buffer)); if (c != nullptr) { // update statistics @@ -2166,12 +2145,12 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { 1.1 * cfd->compaction_picker()->MaxFileSizeForLevel( compact->compaction->output_level())); - CompressionType compression_type = GetCompressionType( - *cfd->full_options(), compact->compaction->output_level(), - compact->compaction->enable_compression()); + CompressionType compression_type = + GetCompressionType(*cfd->options(), compact->compaction->output_level(), + compact->compaction->enable_compression()); compact->builder.reset( - NewTableBuilder(*cfd->full_options(), cfd->internal_comparator(), + NewTableBuilder(*cfd->options(), cfd->internal_comparator(), compact->outfile.get(), compression_type)); } LogFlush(options_.info_log); @@ -2788,8 +2767,8 @@ std::pair DBImpl::GetTailingIteratorPair( Iterator* mutable_iter = super_version->mem->NewIterator(options); // create a DBIter that only uses memtable content; see NewIterator() mutable_iter = - NewDBIterator(&dbname_, env_, *cfd->full_options(), - cfd->user_comparator(), mutable_iter, kMaxSequenceNumber); + NewDBIterator(&dbname_, env_, *cfd->options(), cfd->user_comparator(), + mutable_iter, kMaxSequenceNumber); std::vector list; super_version->imm->AddIterators(options, &list); @@ -2799,8 +2778,8 @@ std::pair DBImpl::GetTailingIteratorPair( // create a DBIter that only uses memtable content; see NewIterator() immutable_iter = - NewDBIterator(&dbname_, env_, *cfd->full_options(), - cfd->user_comparator(), immutable_iter, kMaxSequenceNumber); + NewDBIterator(&dbname_, env_, *cfd->options(), cfd->user_comparator(), + immutable_iter, kMaxSequenceNumber); // register cleanups mutable_iter->RegisterCleanup(CleanupIteratorState, @@ -2937,11 +2916,10 @@ Status DBImpl::GetImpl(const ReadOptions& options, // merge_operands will contain the sequence of merges in the latter case. LookupKey lkey(key, snapshot); BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer); - if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->full_options())) { + if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->options())) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); - } else if (sv->imm->Get(lkey, value, &s, merge_context, - *cfd->full_options())) { + } else if (sv->imm->Get(lkey, value, &s, merge_context, *cfd->options())) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); } else { @@ -2950,7 +2928,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, StartPerfTimer(&from_files_timer); sv->current->Get(options, lkey, value, &s, &merge_context, &stats, - *cfd->full_options(), value_found); + *cfd->options(), value_found); have_stat_update = true; BumpPerfTime(&perf_context.get_from_output_files_time, &from_files_timer); RecordTick(options_.statistics.get(), MEMTABLE_MISS); @@ -3072,14 +3050,14 @@ std::vector DBImpl::MultiGet( auto super_version = mgd->super_version; auto cfd = mgd->cfd; if (super_version->mem->Get(lkey, value, &s, merge_context, - *cfd->full_options())) { + *cfd->options())) { // Done } else if (super_version->imm->Get(lkey, value, &s, merge_context, - *cfd->full_options())) { + *cfd->options())) { // Done } else { super_version->current->Get(options, lkey, value, &s, &merge_context, - &mgd->stats, *cfd->full_options()); + &mgd->stats, *cfd->options()); mgd->have_stat_update = true; } @@ -3134,7 +3112,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, *handle = nullptr; MutexLock l(&mutex_); - if (versions_->GetColumnFamilySet()->Exists(column_family_name)) { + if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) != + nullptr) { return Status::InvalidArgument("Column family already exists"); } VersionEdit edit; @@ -3144,6 +3123,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, edit.SetLogNumber(logfile_number_); edit.SetComparatorName(options.comparator->Name()); + // LogAndApply will both write the creation in MANIFEST and create + // ColumnFamilyData object Status s = versions_->LogAndApply(nullptr, &edit, &mutex_, db_directory_.get(), false, &options); if (s.ok()) { @@ -3184,9 +3165,10 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { } if (s.ok()) { + assert(cfd->IsDropped()); Log(options_.info_log, "Dropped column family with id %u\n", cfd->GetID()); // Flush the memtables. This will make all WAL files referencing dropped - // column family to be obsolete. They will be deleted when user deletes + // column family to be obsolete. They will be deleted once user deletes // column family handle Write(WriteOptions(), nullptr); // ignore error } else { @@ -3237,7 +3219,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, options.snapshot != nullptr ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot; - iter = NewDBIterator(&dbname_, env_, *cfd->full_options(), + iter = NewDBIterator(&dbname_, env_, *cfd->options(), cfd->user_comparator(), iter, snapshot); } @@ -3292,7 +3274,7 @@ Status DBImpl::NewIterators( : latest_snapshot; auto iter = NewInternalIterator(options, cfd, super_versions[i]); - iter = NewDBIterator(&dbname_, env_, *cfd->full_options(), + iter = NewDBIterator(&dbname_, env_, *cfd->options(), cfd->user_comparator(), iter, snapshot); iterators->push_back(iter); } @@ -3364,9 +3346,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { autovector to_delete; // refcounting cfd in iteration for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->IsDropped()) { - continue; - } cfd->Ref(); // May temporarily unlock and wait. status = MakeRoomForWrite(cfd, my_batch == nullptr); @@ -3586,6 +3565,8 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) { // Yield previous error s = bg_error_; break; + } else if (cfd->IsDropped()) { + break; } else if (allow_delay && cfd->NeedSlowdownForNumLevel0Files()) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several @@ -3786,7 +3767,7 @@ Env* DBImpl::GetEnv() const { const Options& DBImpl::GetOptions(ColumnFamilyHandle* column_family) const { auto cfh = reinterpret_cast(column_family); - return *cfh->cfd()->full_options(); + return *cfh->cfd()->options(); } bool DBImpl::GetProperty(ColumnFamilyHandle* column_family, @@ -4058,24 +4039,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, VersionEdit edit; impl->logfile_number_ = new_log_number; impl->log_.reset(new log::Writer(std::move(lfile))); - // We use this LogAndApply just to store the next file number, the one - // that we used by calling impl->versions_->NewFileNumber() - // The used log number are already written to manifest in RecoverLogFile() - // method - s = impl->versions_->LogAndApply(impl->default_cf_handle_->cfd(), &edit, - &impl->mutex_, - impl->db_directory_.get()); - } - if (s.ok()) { + // set column family handles for (auto cf : column_families) { - if (!impl->versions_->GetColumnFamilySet()->Exists(cf.name)) { + auto cfd = + impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); + if (cfd == nullptr) { s = Status::InvalidArgument("Column family not found: ", cf.name); break; } - uint32_t id = impl->versions_->GetColumnFamilySet()->GetID(cf.name); - auto cfd = impl->versions_->GetColumnFamilySet()->GetColumnFamily(id); - assert(cfd != nullptr); handles->push_back( new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); } diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 15725b335..c1a8eef36 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -63,11 +63,11 @@ Status DBImplReadOnly::Get(const ReadOptions& options, MergeContext merge_context; LookupKey lkey(key, snapshot); if (super_version->mem->Get(lkey, value, &s, merge_context, - *cfd->full_options())) { + *cfd->options())) { } else { Version::GetStats stats; super_version->current->Get(options, lkey, value, &s, &merge_context, - &stats, *cfd->full_options()); + &stats, *cfd->options()); } return s; } @@ -80,8 +80,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options, SequenceNumber latest_snapshot = versions_->LastSequence(); Iterator* internal_iter = NewInternalIterator(options, cfd, super_version); return NewDBIterator( - &dbname_, env_, *cfd->full_options(), cfd->user_comparator(), - internal_iter, + &dbname_, env_, *cfd->options(), cfd->user_comparator(), internal_iter, (options.snapshot != nullptr ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot)); diff --git a/db/memtable.cc b/db/memtable.cc index b29f9339c..d0f54ce4b 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -29,7 +29,7 @@ namespace rocksdb { MemTable::MemTable(const InternalKeyComparator& cmp, - const ColumnFamilyOptions& options) + const Options& options) : comparator_(cmp), refs_(0), arena_(options.arena_block_size), diff --git a/db/memtable.h b/db/memtable.h index cf93aece8..38602e3aa 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -39,7 +39,7 @@ class MemTable { // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. explicit MemTable(const InternalKeyComparator& comparator, - const ColumnFamilyOptions& options); + const Options& options); ~MemTable(); diff --git a/db/repair.cc b/db/repair.cc index 597ddd658..4e0d025c0 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -108,7 +108,6 @@ class Repairer { InternalKeyComparator const icmp_; InternalFilterPolicy const ipolicy_; Options const options_; - ColumnFamilyOptions const cf_options_; std::shared_ptr raw_table_cache_; TableCache* table_cache_; VersionEdit* edit_; diff --git a/db/version_set.cc b/db/version_set.cc index 15411e286..08cf33a8b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -249,7 +249,7 @@ bool Version::PrefixMayMatch(const ReadOptions& options, Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { auto table_cache = cfd_->table_cache(); - auto options = cfd_->full_options(); + auto options = cfd_->options(); for (int level = 0; level < num_levels_; level++) { for (const auto& file_meta : files_[level]) { auto fname = TableFileName(vset_->dbname_, file_meta->number); @@ -1491,11 +1491,6 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, assert(column_family_data != nullptr || edit->is_column_family_add_); - if (column_family_data != nullptr && column_family_data->IsDropped()) { - // if column family is dropped no need to write anything to the manifest - // (unless, of course, thit is the drop column family write) - return Status::OK(); - } if (edit->is_column_family_drop_) { // if we drop column family, we have to make sure to save max column family, // so that we don't reuse existing ID @@ -1511,6 +1506,16 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (w.done) { return w.status; } + if (column_family_data != nullptr && column_family_data->IsDropped()) { + // if column family is dropped by the time we get here, no need to write + // anything to the manifest + manifest_writers_.pop_front(); + // Notify new head of write queue + if (!manifest_writers_.empty()) { + manifest_writers_.front()->cv.Signal(); + } + return Status::OK(); + } std::vector batch_edits; Version* v = nullptr; @@ -2353,9 +2358,6 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { // LogAndApply. Column family manipulations can only happen within LogAndApply // (the same single thread), so we're safe for (auto cfd : *column_family_set_) { - if (cfd->IsDropped()) { - continue; - } { // Store column family info VersionEdit edit; @@ -2401,19 +2403,18 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { } } - // save max column family to avoid reusing the same column - // family ID for two different column families - if (column_family_set_->GetMaxColumnFamily() > 0) { + { + // persist max column family, last sequence and next file VersionEdit edit; - edit.SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily()); + if (column_family_set_->GetMaxColumnFamily() > 0) { + edit.SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily()); + } + edit.SetLastSequence(last_sequence_); + edit.SetNextFile(next_file_number_); std::string record; edit.EncodeTo(&record); - Status s = log->AddRecord(record); - if (!s.ok()) { - return s; - } + return log->AddRecord(record); } - return Status::OK(); } // Opens the mainfest file and reads all records diff --git a/db/write_batch.cc b/db/write_batch.cc index 5747c5302..da8541224 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -289,7 +289,7 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); - const Options* options = cf_mems_->GetFullOptions(); + const Options* options = cf_mems_->GetOptions(); if (!options->inplace_update_support) { mem->Add(sequence_, kTypeValue, key, value); } else if (options->inplace_callback == nullptr) { @@ -344,7 +344,7 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); - const Options* options = cf_mems_->GetFullOptions(); + const Options* options = cf_mems_->GetOptions(); bool perform_merge = false; if (options->max_successive_merges > 0 && db_ != nullptr) { @@ -413,7 +413,7 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); - const Options* options = cf_mems_->GetFullOptions(); + const Options* options = cf_mems_->GetOptions(); if (!dont_filter_deletes_ && options->filter_deletes) { SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index e5ee045cd..793ee3e0e 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -26,7 +26,7 @@ class ColumnFamilyMemTables { // been processed) virtual uint64_t GetLogNumber() const = 0; virtual MemTable* GetMemTable() const = 0; - virtual const Options* GetFullOptions() const = 0; + virtual const Options* GetOptions() const = 0; virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0; }; @@ -47,7 +47,7 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { return mem_; } - const Options* GetFullOptions() const override { + const Options* GetOptions() const override { assert(ok_); return options_; } diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 773823216..c6e17e8b7 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -24,7 +24,7 @@ static std::string PrintContents(WriteBatch* b) { auto factory = std::make_shared(); Options options; options.memtable_factory = factory; - MemTable* mem = new MemTable(cmp, ColumnFamilyOptions(options)); + MemTable* mem = new MemTable(cmp, options); mem->Ref(); std::string state; ColumnFamilyMemTablesDefault cf_mems_default(mem, &options); diff --git a/table/table_test.cc b/table/table_test.cc index aa874eb18..18ae2a3aa 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -410,8 +410,7 @@ class MemTableConstructor: public Constructor { table_factory_(new SkipListFactory) { Options options; options.memtable_factory = table_factory_; - memtable_ = - new MemTable(internal_comparator_, ColumnFamilyOptions(options)); + memtable_ = new MemTable(internal_comparator_, options); memtable_->Ref(); } ~MemTableConstructor() {