diff --git a/db/column_family.cc b/db/column_family.cc index 218958dba..77e224000 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1,8 +1,64 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + #include "db/column_family.h" + +#include +#include +#include + #include "db/version_set.h" namespace rocksdb { +SuperVersion::SuperVersion(const int num_memtables) { + to_delete.resize(num_memtables); +} + +SuperVersion::~SuperVersion() { + for (auto td : to_delete) { + delete td; + } +} + +SuperVersion* SuperVersion::Ref() { + refs.fetch_add(1, std::memory_order_relaxed); + return this; +} + +bool SuperVersion::Unref() { + assert(refs > 0); + // fetch_sub returns the previous value of ref + return refs.fetch_sub(1, std::memory_order_relaxed) == 1; +} + +void SuperVersion::Cleanup() { + assert(refs.load(std::memory_order_relaxed) == 0); + imm->Unref(&to_delete); + MemTable* m = mem->Unref(); + if (m != nullptr) { + to_delete.push_back(m); + } + current->Unref(); +} + +void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm, + Version* new_current) { + mem = new_mem; + imm = new_imm; + current = new_current; + mem->Ref(); + imm->Ref(); + current->Ref(); + refs.store(1, std::memory_order_relaxed); +} + ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, Version* dummy_versions, const ColumnFamilyOptions& options) @@ -10,12 +66,40 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, name(name), dummy_versions(dummy_versions), current(nullptr), - options(options) {} + options(options), + mem(nullptr), + imm(options.min_write_buffer_number_to_merge), + super_version(nullptr) {} ColumnFamilyData::~ColumnFamilyData() { + if (super_version != nullptr) { + bool is_last_reference __attribute__((unused)); + is_last_reference = super_version->Unref(); + assert(is_last_reference); + super_version->Cleanup(); + delete super_version; + } // List must be empty assert(dummy_versions->next_ == dummy_versions); delete dummy_versions; + + if (mem != nullptr) { + delete mem->Unref(); + } + std::vector to_delete; + imm.current()->Unref(&to_delete); + for (MemTable* m : to_delete) { + delete m; + } +} + +void ColumnFamilyData::CreateNewMemtable() { + assert(current != nullptr); + if (mem != nullptr) { + delete mem->Unref(); + } + mem = new MemTable(current->vset_->icmp_, options); + mem->Ref(); } ColumnFamilySet::ColumnFamilySet() : max_column_family_(0) {} @@ -31,7 +115,7 @@ ColumnFamilySet::~ColumnFamilySet() { ColumnFamilyData* ColumnFamilySet::GetDefault() const { auto ret = GetColumnFamily(0); - assert(ret != nullptr); // default column family should always exist + assert(ret != nullptr); // default column family should always exist return ret; } diff --git a/db/column_family.h b/db/column_family.h index fa6902fd2..b5235d3df 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -9,16 +9,47 @@ #pragma once -#include "rocksdb/options.h" - #include #include #include +#include "rocksdb/options.h" +#include "db/memtablelist.h" + namespace rocksdb { class Version; class VersionSet; +class MemTable; +class MemTableListVersion; + +// holds references to memtable, all immutable memtables and version +struct SuperVersion { + MemTable* mem; + MemTableListVersion* imm; + Version* current; + std::atomic refs; + // We need to_delete because during Cleanup(), imm->Unref() returns + // all memtables that we need to free through this vector. We then + // delete all those memtables outside of mutex, during destruction + std::vector to_delete; + + // should be called outside the mutex + explicit SuperVersion(const int num_memtables = 0); + ~SuperVersion(); + SuperVersion* Ref(); + // Returns true if this was the last reference and caller should + // call Clenaup() and delete the object + bool Unref(); + + // call these two methods with db mutex held + // Cleanup unrefs mem, imm and current. Also, it stores all memtables + // that needs to be deleted in to_delete vector. Unrefing those + // objects needs to be done in the mutex + void Cleanup(); + void Init(MemTable* new_mem, MemTableListVersion* new_imm, + Version* new_current); +}; // column family metadata struct ColumnFamilyData { @@ -28,27 +59,34 @@ struct ColumnFamilyData { Version* current; // == dummy_versions->prev_ ColumnFamilyOptions options; + MemTable* mem; + MemTableList imm; + SuperVersion* super_version; + ColumnFamilyData(uint32_t id, const std::string& name, Version* dummy_versions, const ColumnFamilyOptions& options); ~ColumnFamilyData(); + + void CreateNewMemtable(); }; class ColumnFamilySet { public: - class iterator { - public: - explicit iterator( - std::unordered_map::iterator itr) - : itr_(itr) {} - iterator& operator++() { - ++itr_; - return *this; - } - bool operator!=(const iterator& other) { return this->itr_ != other.itr_; } - ColumnFamilyData* operator*() { return itr_->second; } - private: - std::unordered_map::iterator itr_; - }; + class iterator { + public: + explicit iterator( + std::unordered_map::iterator itr) + : itr_(itr) {} + iterator& operator++() { + ++itr_; + return *this; + } + bool operator!=(const iterator& other) { return this->itr_ != other.itr_; } + ColumnFamilyData* operator*() { return itr_->second; } + + private: + std::unordered_map::iterator itr_; + }; ColumnFamilySet(); ~ColumnFamilySet(); diff --git a/db/db_impl.cc b/db/db_impl.cc index b80ed8bd0..de4288736 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -265,11 +265,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) mutex_(options.use_adaptive_mutex), shutting_down_(nullptr), bg_cv_(&mutex_), - mem_rep_factory_(options_.memtable_factory.get()), - mem_(new MemTable(internal_comparator_, options_)), - imm_(options_.min_write_buffer_number_to_merge), logfile_number_(0), - super_version_(nullptr), super_version_number_(0), tmp_batch_(), bg_compaction_scheduled_(0), @@ -297,8 +293,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) bg_work_gate_closed_(false), refitting_level_(false) { - mem_->Ref(); - env_->GetAbsolutePath(dbname, &db_absolute_path_); stall_leveln_slowdown_.resize(options.num_levels); @@ -333,11 +327,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) } DBImpl::~DBImpl() { - std::vector to_delete; - to_delete.reserve(options_.max_write_buffer_number); - // Wait for background work to finish - if (flush_on_destroy_ && mem_->GetFirstSequenceNumber() != 0) { + if (flush_on_destroy_ && default_cfd_->mem->GetFirstSequenceNumber() != 0) { FlushMemTable(FlushOptions()); } mutex_.Lock(); @@ -347,27 +338,12 @@ DBImpl::~DBImpl() { bg_logstats_scheduled_) { bg_cv_.Wait(); } - if (super_version_ != nullptr) { - bool is_last_reference __attribute__((unused)); - is_last_reference = super_version_->Unref(); - assert(is_last_reference); - super_version_->Cleanup(); - delete super_version_; - } mutex_.Unlock(); if (db_lock_ != nullptr) { env_->UnlockFile(db_lock_); } - if (mem_ != nullptr) { - delete mem_->Unref(); - } - - imm_.current()->Unref(&to_delete); - for (MemTable* m: to_delete) { - delete m; - } LogFlush(options_.info_log); } @@ -383,13 +359,6 @@ void DBImpl::TEST_Destroy_DBImpl() { bg_logstats_scheduled_) { bg_cv_.Wait(); } - if (super_version_ != nullptr) { - bool is_last_reference __attribute__((unused)); - is_last_reference = super_version_->Unref(); - assert(is_last_reference); - super_version_->Cleanup(); - delete super_version_; - } // Prevent new compactions from occuring. bg_work_gate_closed_ = true; @@ -488,49 +457,6 @@ void DBImpl::MaybeDumpStats() { } } -// DBImpl::SuperVersion methods -DBImpl::SuperVersion::SuperVersion(const int num_memtables) { - to_delete.resize(num_memtables); -} - -DBImpl::SuperVersion::~SuperVersion() { - for (auto td : to_delete) { - delete td; - } -} - -DBImpl::SuperVersion* DBImpl::SuperVersion::Ref() { - refs.fetch_add(1, std::memory_order_relaxed); - return this; -} - -bool DBImpl::SuperVersion::Unref() { - assert(refs > 0); - // fetch_sub returns the previous value of ref - return refs.fetch_sub(1, std::memory_order_relaxed) == 1; -} - -void DBImpl::SuperVersion::Cleanup() { - assert(refs.load(std::memory_order_relaxed) == 0); - imm->Unref(&to_delete); - MemTable* m = mem->Unref(); - if (m != nullptr) { - to_delete.push_back(m); - } - current->Unref(); -} - -void DBImpl::SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm, - Version* new_current) { - mem = new_mem; - imm = new_imm; - current = new_current; - mem->Ref(); - imm->Ref(); - current->Ref(); - refs.store(1, std::memory_order_relaxed); -} - // Returns the list of live files in 'sst_live' and the list // of all files in the filesystem in 'all_files'. // no_full_scan = true -- never do the full scan using GetChildren() @@ -925,6 +851,7 @@ Status DBImpl::Recover( Status s = versions_->Recover(column_families); if (s.ok()) { SequenceNumber max_sequence(0); + default_cfd_ = versions_->GetColumnFamilySet()->GetDefault(); // Recover from all newer log files than the ones named in the // descriptor (new log files may have been added by the previous @@ -1037,7 +964,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, } WriteBatchInternal::SetContents(&batch, record); - status = WriteBatchInternal::InsertInto(&batch, mem_, &options_); + status = + WriteBatchInternal::InsertInto(&batch, default_cfd_->mem, &options_); memtable_empty = false; MaybeIgnoreError(&status); if (!status.ok()) { @@ -1050,13 +978,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, *max_sequence = last_seq; } - if (!read_only && - mem_->ApproximateMemoryUsage() > options_.write_buffer_size) { - status = WriteLevel0TableForRecovery(mem_, &edit); + if (!read_only && default_cfd_->mem->ApproximateMemoryUsage() > + options_.write_buffer_size) { + status = WriteLevel0TableForRecovery(default_cfd_->mem, &edit); // we still want to clear memtable, even if the recovery failed - delete mem_->Unref(); - mem_ = new MemTable(internal_comparator_, options_); - mem_->Ref(); + default_cfd_->CreateNewMemtable(); memtable_empty = true; if (!status.ok()) { // Reflect errors immediately so that conditions like full @@ -1067,10 +993,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, } if (!memtable_empty && !read_only) { - status = WriteLevel0TableForRecovery(mem_, &edit); - delete mem_->Unref(); - mem_ = new MemTable(internal_comparator_, options_); - mem_->Ref(); + status = WriteLevel0TableForRecovery(default_cfd_->mem, &edit); + default_cfd_->CreateNewMemtable(); if (!status.ok()) { return status; } @@ -1233,9 +1157,9 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, DeletionState& deletion_state) { mutex_.AssertHeld(); - assert(imm_.size() != 0); + assert(default_cfd_->imm.size() != 0); - if (!imm_.IsFlushPending()) { + if (!default_cfd_->imm.IsFlushPending()) { Log(options_.info_log, "FlushMemTableToOutputFile already in progress"); Status s = Status::IOError("FlushMemTableToOutputFile already in progress"); return s; @@ -1244,7 +1168,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, // Save the contents of the earliest memtable as a new Table uint64_t file_number; std::vector mems; - imm_.PickMemtablesToFlush(&mems); + default_cfd_->imm.PickMemtablesToFlush(&mems); if (mems.empty()) { Log(options_.info_log, "Nothing in memstore to flush"); Status s = Status::IOError("Nothing in memstore to flush"); @@ -1279,12 +1203,12 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, } // Replace immutable memtable with the generated Table - s = imm_.InstallMemtableFlushResults( + s = default_cfd_->imm.InstallMemtableFlushResults( mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number, pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get()); if (s.ok()) { - InstallSuperVersion(deletion_state); + InstallSuperVersion(default_cfd_, deletion_state); if (madeProgress) { *madeProgress = 1; } @@ -1410,7 +1334,7 @@ Status DBImpl::ReFitLevel(int level, int target_level) { edit.DebugString().data()); status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get()); - superversion_to_free = InstallSuperVersion(new_superversion); + superversion_to_free = InstallSuperVersion(default_cfd_, new_superversion); new_superversion = nullptr; Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data()); @@ -1737,10 +1661,10 @@ Status DBImpl::WaitForFlushMemTable() { Status s; // Wait until the compaction completes MutexLock l(&mutex_); - while (imm_.size() > 0 && bg_error_.ok()) { + while (default_cfd_->imm.size() > 0 && bg_error_.ok()) { bg_cv_.Wait(); } - if (imm_.size() != 0) { + if (default_cfd_->imm.size() != 0) { s = bg_error_; } return s; @@ -1776,7 +1700,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else { - bool is_flush_pending = imm_.IsFlushPending(); + bool is_flush_pending = default_cfd_->imm.IsFlushPending(); if (is_flush_pending && (bg_flush_scheduled_ < options_.max_background_flushes)) { // memtable flush needed @@ -1811,7 +1735,7 @@ void DBImpl::BGWorkCompaction(void* db) { Status DBImpl::BackgroundFlush(bool* madeProgress, DeletionState& deletion_state) { Status stat; - while (stat.ok() && imm_.IsFlushPending()) { + while (stat.ok() && default_cfd_->imm.IsFlushPending()) { Log(options_.info_log, "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", options_.max_background_flushes - bg_flush_scheduled_); @@ -1931,7 +1855,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, mutex_.AssertHeld(); // TODO: remove memtable flush from formal compaction - while (imm_.IsFlushPending()) { + while (default_cfd_->imm.IsFlushPending()) { Log(options_.info_log, "BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots " "available %d", @@ -1983,7 +1907,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get()); - InstallSuperVersion(deletion_state); + InstallSuperVersion(default_cfd_, deletion_state); + Version::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast(f->number), c->level() + 1, @@ -2334,11 +2259,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { // Prioritize immutable compaction work // TODO: remove memtable flush from normal compaction work - if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) { + if (default_cfd_->imm.imm_flush_needed.NoBarrier_Load() != nullptr) { const uint64_t imm_start = env_->NowMicros(); LogFlush(options_.info_log); mutex_.Lock(); - if (imm_.IsFlushPending()) { + if (default_cfd_->imm.IsFlushPending()) { FlushMemTableToOutputFile(nullptr, deletion_state); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } @@ -2649,7 +2574,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, if (status.ok()) { status = InstallCompactionResults(compact); - InstallSuperVersion(deletion_state); + InstallSuperVersion(default_cfd_, deletion_state); } Version::LevelSummaryStorage tmp; Log(options_.info_log, @@ -2716,10 +2641,9 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, // Collect together all needed child iterators for mem mutex_.Lock(); *latest_snapshot = versions_->LastSequence(); - mem_->Ref(); - mutable_mem = mem_; - // Collect together all needed child iterators for imm_ - immutable_mems = imm_.current(); + mutable_mem = default_cfd_->mem; + mutable_mem->Ref(); + immutable_mems = default_cfd_->imm.current(); immutable_mems->Ref(); versions_->current()->Ref(); version = versions_->current(); @@ -2758,9 +2682,9 @@ std::pair DBImpl::GetTailingIteratorPair( // get all child iterators and bump their refcounts under lock mutex_.Lock(); - mutable_mem = mem_; + mutable_mem = default_cfd_->mem; mutable_mem->Ref(); - immutable_mems = imm_.current(); + immutable_mems = default_cfd_->imm.current(); immutable_mems->Ref(); version = versions_->current(); version->Ref(); @@ -2823,12 +2747,13 @@ Status DBImpl::Get(const ReadOptions& options, // first call already used it. In that rare case, we take a hit and create a // new SuperVersion() inside of the mutex. We do similar thing // for superversion_to_free -void DBImpl::InstallSuperVersion(DeletionState& deletion_state) { +void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, + DeletionState& deletion_state) { // if new_superversion == nullptr, it means somebody already used it SuperVersion* new_superversion = (deletion_state.new_superversion != nullptr) ? deletion_state.new_superversion : new SuperVersion(); - SuperVersion* old_superversion = InstallSuperVersion(new_superversion); + SuperVersion* old_superversion = InstallSuperVersion(cfd, new_superversion); deletion_state.new_superversion = nullptr; if (deletion_state.superversion_to_free != nullptr) { // somebody already put it there @@ -2838,13 +2763,16 @@ void DBImpl::InstallSuperVersion(DeletionState& deletion_state) { } } -DBImpl::SuperVersion* DBImpl::InstallSuperVersion( - SuperVersion* new_superversion) { +SuperVersion* DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, + SuperVersion* new_superversion) { mutex_.AssertHeld(); - new_superversion->Init(mem_, imm_.current(), versions_->current()); - SuperVersion* old_superversion = super_version_; - super_version_ = new_superversion; - ++super_version_number_; + new_superversion->Init(cfd->mem, cfd->imm.current(), cfd->current); + SuperVersion* old_superversion = cfd->super_version; + cfd->super_version = new_superversion; + if (cfd->id == 0) { + // TODO this is only for default column family for now + ++super_version_number_; + } if (old_superversion != nullptr && old_superversion->Unref()) { old_superversion->Cleanup(); return old_superversion; // will let caller delete outside of mutex @@ -2868,7 +2796,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, // This can be replaced by using atomics and spinlock instead of big mutex mutex_.Lock(); - SuperVersion* get_version = super_version_->Ref(); + SuperVersion* get_version = default_cfd_->super_version->Ref(); mutex_.Unlock(); bool have_stat_update = false; @@ -2939,9 +2867,10 @@ std::vector DBImpl::MultiGet( snapshot = versions_->LastSequence(); } - MemTable* mem = mem_; - MemTableListVersion* imm = imm_.current(); - Version* current = versions_->current(); + // TODO only works for default column family + MemTable* mem = default_cfd_->mem; + MemTableListVersion* imm = default_cfd_->imm.current(); + Version* current = default_cfd_->current; mem->Ref(); imm->Ref(); current->Ref(); @@ -3012,12 +2941,12 @@ std::vector DBImpl::MultiGet( Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle* handle) { + MutexLock l(&mutex_); if (versions_->GetColumnFamilySet()->Exists(column_family_name)) { return Status::InvalidArgument("Column family already exists"); } VersionEdit edit; edit.AddColumnFamily(column_family_name); - MutexLock l(&mutex_); handle->id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); edit.SetColumnFamily(handle->id); Status s = versions_->LogAndApply(&edit, &mutex_); @@ -3170,7 +3099,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes - // into mem_. + // into default_cfd_->mem. { mutex_.Unlock(); WriteBatch* updates = nullptr; @@ -3216,7 +3145,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } } if (status.ok()) { - status = WriteBatchInternal::InsertInto(updates, mem_, &options_, this, + status = WriteBatchInternal::InsertInto(updates, default_cfd_->mem, + &options_, this, options_.filter_deletes); if (!status.ok()) { // Panic for in-memory corruptions @@ -3382,14 +3312,15 @@ Status DBImpl::MakeRoomForWrite(bool force, allow_delay = false; // Do not delay a single write more than once mutex_.Lock(); delayed_writes_++; - } else if (!force && - (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { + } else if (!force && (default_cfd_->mem->ApproximateMemoryUsage() <= + options_.write_buffer_size)) { // There is room in current memtable if (allow_delay) { DelayLoggingAndReset(); } break; - } else if (imm_.size() == options_.max_write_buffer_number - 1) { + } else if (default_cfd_->imm.size() == + options_.max_write_buffer_number - 1) { // We have filled up the current memtable, but the previous // ones are still being compacted, so we wait. DelayLoggingAndReset(); @@ -3498,20 +3429,21 @@ Status DBImpl::MakeRoomForWrite(bool force, } logfile_number_ = new_log_number; log_.reset(new log::Writer(std::move(lfile))); - mem_->SetNextLogNumber(logfile_number_); - imm_.Add(mem_); + default_cfd_->mem->SetNextLogNumber(logfile_number_); + default_cfd_->imm.Add(default_cfd_->mem); if (force) { - imm_.FlushRequested(); + default_cfd_->imm.FlushRequested(); } - mem_ = memtmp; - mem_->Ref(); + default_cfd_->mem = memtmp; + default_cfd_->mem->Ref(); Log(options_.info_log, "New memtable created with log file: #%lu\n", (unsigned long)logfile_number_); - mem_->SetLogNumber(logfile_number_); + default_cfd_->mem->SetLogNumber(logfile_number_); force = false; // Do not force another compaction if have room MaybeScheduleFlushOrCompaction(); - *superversion_to_free = InstallSuperVersion(new_superversion); + *superversion_to_free = + InstallSuperVersion(default_cfd_, new_superversion); } } return s; @@ -3802,7 +3734,7 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, *value = versions_->current()->DebugString(); return true; } else if (in == "num-immutable-mem-table") { - *value = std::to_string(imm_.size()); + *value = std::to_string(default_cfd_->imm.size()); return true; } @@ -3900,7 +3832,7 @@ Status DBImpl::DeleteFile(std::string name) { edit.DeleteFile(level, number); status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get()); if (status.ok()) { - InstallSuperVersion(deletion_state); + InstallSuperVersion(default_cfd_, deletion_state); } FindObsoleteFiles(deletion_state, false); } // lock released here @@ -4028,7 +3960,8 @@ Status DB::OpenWithColumnFamilies( return s; } impl->mutex_.Lock(); - s = impl->Recover(column_families); // Handles create_if_missing, error_if_exists + // Handles create_if_missing, error_if_exists + s = impl->Recover(column_families); if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); unique_ptr lfile; @@ -4061,8 +3994,10 @@ Status DB::OpenWithColumnFamilies( } } if (s.ok()) { - delete impl->InstallSuperVersion(new DBImpl::SuperVersion()); - impl->mem_->SetLogNumber(impl->logfile_number_); + for (auto cfd : *impl->versions_->GetColumnFamilySet()) { + delete impl->InstallSuperVersion(cfd, new SuperVersion()); + cfd->mem->SetLogNumber(impl->logfile_number_); + } impl->DeleteObsoleteFiles(); impl->MaybeScheduleFlushOrCompaction(); impl->MaybeScheduleLogDBDeployStats(); diff --git a/db/db_impl.h b/db/db_impl.h index 24ef16e13..046cb28c7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -15,6 +15,7 @@ #include "db/dbformat.h" #include "db/log_writer.h" #include "db/snapshot.h" +#include "db/column_family.h" #include "db/version_edit.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -174,34 +175,6 @@ class DBImpl : public DB { default_interval_to_delete_obsolete_WAL_ = default_interval_to_delete_obsolete_WAL; } - // holds references to memtable, all immutable memtables and version - struct SuperVersion { - MemTable* mem; - MemTableListVersion* imm; - Version* current; - std::atomic refs; - // We need to_delete because during Cleanup(), imm->Unref() returns - // all memtables that we need to free through this vector. We then - // delete all those memtables outside of mutex, during destruction - std::vector to_delete; - - // should be called outside the mutex - explicit SuperVersion(const int num_memtables = 0); - ~SuperVersion(); - SuperVersion* Ref(); - // Returns true if this was the last reference and caller should - // call Clenaup() and delete the object - bool Unref(); - - // call these two methods with db mutex held - // Cleanup unrefs mem, imm and current. Also, it stores all memtables - // that needs to be deleted in to_delete vector. Unrefing those - // objects needs to be done in the mutex - void Cleanup(); - void Init(MemTable* new_mem, MemTableListVersion* new_imm, - Version* new_current); - }; - // needed for CleanupIteratorState struct DeletionState { inline bool HaveSomethingToDelete() const { @@ -286,7 +259,8 @@ class DBImpl : public DB { } MemTable* GetMemTable() { - return mem_; + // TODO currently only works for default column family + return default_cfd_->mem; } Iterator* NewInternalIterator(const ReadOptions&, @@ -425,13 +399,9 @@ class DBImpl : public DB { port::Mutex mutex_; port::AtomicPointer shutting_down_; port::CondVar bg_cv_; // Signalled when background work finishes - MemTableRepFactory* mem_rep_factory_; - MemTable* mem_; - MemTableList imm_; // Memtable that are not changing uint64_t logfile_number_; unique_ptr log_; - - SuperVersion* super_version_; + ColumnFamilyData* default_cfd_; // An ordinal representing the current SuperVersion. Updated by // InstallSuperVersion(), i.e. incremented every time super_version_ @@ -625,11 +595,13 @@ class DBImpl : public DB { // Foreground threads call this function directly (they don't carry // deletion state and have to handle their own creation and deletion // of SuperVersion) - SuperVersion* InstallSuperVersion(SuperVersion* new_superversion); + SuperVersion* InstallSuperVersion(ColumnFamilyData* cfd, + SuperVersion* new_superversion); // Background threads call this function, which is just a wrapper around // the InstallSuperVersion() function above. Background threads carry // deletion_state which can have new_superversion already allocated. - void InstallSuperVersion(DeletionState& deletion_state); + void InstallSuperVersion(ColumnFamilyData* cfd, + DeletionState& deletion_state); // Function that Get and KeyMayExist call with no_io true or false // Note: 'value_found' from KeyMayExist propagates here diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index c94440170..1810a9620 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -90,7 +90,8 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname, std::vector column_families; column_families.push_back( ColumnFamilyDescriptor(default_column_family_name, cf_options)); - Status s = impl->Recover(column_families, true /* read only */, error_if_log_file_exist); + Status s = impl->Recover(column_families, true /* read only */, + error_if_log_file_exist); impl->mutex_.Unlock(); if (s.ok()) { *dbptr = impl; diff --git a/db/memtable.cc b/db/memtable.cc index bf2dfa64b..03f1ffac6 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -33,7 +33,8 @@ struct hash { namespace rocksdb { -MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options) +MemTable::MemTable(const InternalKeyComparator& cmp, + const ColumnFamilyOptions& options) : comparator_(cmp), refs_(0), arena_impl_(options.arena_block_size), diff --git a/db/memtable.h b/db/memtable.h index 1b9005800..415c7070b 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -13,7 +13,7 @@ #include #include "db/dbformat.h" #include "db/skiplist.h" -#include "db/version_set.h" +#include "db/version_edit.h" #include "rocksdb/db.h" #include "rocksdb/memtablerep.h" #include "util/arena_impl.h" @@ -35,7 +35,7 @@ class MemTable { // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. explicit MemTable(const InternalKeyComparator& comparator, - const Options& options = Options()); + const ColumnFamilyOptions& options); ~MemTable(); diff --git a/db/memtablelist.cc b/db/memtablelist.cc index c93b58b6e..9bd69b1af 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -8,6 +8,7 @@ #include #include "rocksdb/db.h" #include "db/memtable.h" +#include "db/version_set.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "util/coding.h" diff --git a/db/memtablelist.h b/db/memtablelist.h index b2cd84103..57c414929 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -13,7 +13,7 @@ #include "rocksdb/iterator.h" #include "db/dbformat.h" #include "db/skiplist.h" -#include "memtable.h" +#include "db/memtable.h" namespace rocksdb { diff --git a/db/version_set.cc b/db/version_set.cc index 8ccbcf946..9e64759ee 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1392,6 +1392,9 @@ VersionSet::~VersionSet() { for (auto cfd : *column_family_set_) { cfd->current->Unref(); } + // we need to delete column_family_set_ because its destructor depends on + // VersionSet + column_family_set_.reset(); for (auto file : obsolete_files_) { delete file; } @@ -2401,6 +2404,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( edit->column_family_name_, edit->column_family_, dummy_versions, options); AppendVersion(new_cfd, new Version(this, current_version_number_++)); + new_cfd->CreateNewMemtable(); return new_cfd; } diff --git a/db/version_set.h b/db/version_set.h index 879a34701..2a9bf6d91 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -446,6 +446,8 @@ class VersionSet { friend class Compaction; friend class Version; + // TODO temporarily until we have what ColumnFamilyData needs (icmp_) + friend struct ColumnFamilyData; struct LogReporter : public log::Reader::Reporter { Status* status; diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 396e3ea6e..62e197706 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -24,7 +24,7 @@ static std::string PrintContents(WriteBatch* b) { auto factory = std::make_shared(); Options options; options.memtable_factory = factory; - MemTable* mem = new MemTable(cmp, options); + MemTable* mem = new MemTable(cmp, ColumnFamilyOptions(options)); mem->Ref(); std::string state; Status s = WriteBatchInternal::InsertInto(b, mem, &options); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index ecf0e3632..871cf015a 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -315,7 +315,8 @@ class DB { int target_level = -1) = 0; Status CompactRange(const Slice* begin, const Slice* end, bool reduce_level = false, int target_level = -1) { - return CompactRange(default_column_family, begin, end, reduce_level, target_level); + return CompactRange(default_column_family, begin, end, reduce_level, + target_level); } // Number of levels used for this DB. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 00acafb0a..e75421d08 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -357,6 +357,21 @@ struct ColumnFamilyOptions { // order. int table_cache_remove_scan_count_limit; + // size of one block in arena memory allocation. + // If <= 0, a proper value is automatically calculated (usually 1/10 of + // writer_buffer_size). + // + // There are two additonal restriction of the The specified size: + // (1) size should be in the range of [4096, 2 << 30] and + // (2) be the multiple of the CPU word (which helps with the memory + // alignment). + // + // We'll automatically check and adjust the size number to make sure it + // conforms to the restrictions. + // + // Default: 0 + size_t arena_block_size; + // Disable automatic compactions. Manual compactions can still // be issued on this column family bool disable_auto_compactions; @@ -562,21 +577,6 @@ struct DBOptions { // The default value is MAX_INT so that roll-over does not take place. uint64_t max_manifest_file_size; - // size of one block in arena memory allocation. - // If <= 0, a proper value is automatically calculated (usually 1/10 of - // writer_buffer_size). - // - // There are two additonal restriction of the The specified size: - // (1) size should be in the range of [4096, 2 << 30] and - // (2) be the multiple of the CPU word (which helps with the memory - // alignment). - // - // We'll automatically check and adjust the size number to make sure it - // conforms to the restrictions. - // - // Default: 0 - size_t arena_block_size; - // The following two fields affect how archived logs will be deleted. // 1. If both set to 0, logs will be deleted asap and will not get into // the archive. diff --git a/table/table_test.cc b/table/table_test.cc index af794fd13..aabe2f424 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -371,7 +371,8 @@ class MemTableConstructor: public Constructor { table_factory_(new SkipListFactory) { Options options; options.memtable_factory = table_factory_; - memtable_ = new MemTable(internal_comparator_, options); + memtable_ = + new MemTable(internal_comparator_, ColumnFamilyOptions(options)); memtable_->Ref(); } ~MemTableConstructor() { diff --git a/util/options.cc b/util/options.cc index e4a14c1da..f2e11c0d1 100644 --- a/util/options.cc +++ b/util/options.cc @@ -63,6 +63,7 @@ ColumnFamilyOptions::ColumnFamilyOptions() no_block_cache(false), table_cache_numshardbits(4), table_cache_remove_scan_count_limit(16), + arena_block_size(0), disable_auto_compactions(false), purge_redundant_kvs_while_flush(true), block_size_deviation(10), @@ -121,6 +122,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) table_cache_numshardbits(options.table_cache_numshardbits), table_cache_remove_scan_count_limit( options.table_cache_remove_scan_count_limit), + arena_block_size(options.arena_block_size), disable_auto_compactions(options.disable_auto_compactions), purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush), block_size_deviation(options.block_size_deviation), @@ -157,7 +159,6 @@ DBOptions::DBOptions() log_file_time_to_roll(0), keep_log_file_num(1000), max_manifest_file_size(std::numeric_limits::max()), - arena_block_size(0), WAL_ttl_seconds(0), WAL_size_limit_MB(0), manifest_preallocation_size(4 * 1024 * 1024), @@ -193,7 +194,6 @@ DBOptions::DBOptions(const Options& options) log_file_time_to_roll(options.log_file_time_to_roll), keep_log_file_num(options.keep_log_file_num), max_manifest_file_size(options.max_manifest_file_size), - arena_block_size(options.arena_block_size), WAL_ttl_seconds(options.WAL_ttl_seconds), WAL_size_limit_MB(options.WAL_size_limit_MB), manifest_preallocation_size(options.manifest_preallocation_size),