From eb055609e4d2522976912774eed7403254332a88 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 24 Jan 2014 14:30:28 -0800 Subject: [PATCH] [column families] Move memtable and immutable memtable list to column family data Summary: All memtables and immutable memtables are moved from DBImpl to ColumnFamilyData. For now, they are all referenced from default column family in DBImpl. It shouldn't be hard to get them from custom column family. Test Plan: make check Reviewers: dhruba, kailiu, sdong CC: leveldb Differential Revision: https://reviews.facebook.net/D15459 --- db/column_family.cc | 88 +++++++++++++++- db/column_family.h | 70 ++++++++++--- db/db_impl.cc | 209 +++++++++++++------------------------- db/db_impl.h | 44 ++------ db/db_impl_readonly.cc | 3 +- db/memtable.cc | 3 +- db/memtable.h | 4 +- db/memtablelist.cc | 1 + db/memtablelist.h | 2 +- db/version_set.cc | 4 + db/version_set.h | 2 + db/write_batch_test.cc | 2 +- include/rocksdb/db.h | 3 +- include/rocksdb/options.h | 30 +++--- table/table_test.cc | 3 +- util/options.cc | 4 +- 16 files changed, 256 insertions(+), 216 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 218958dba..77e224000 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1,8 +1,64 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + #include "db/column_family.h" + +#include +#include +#include + #include "db/version_set.h" namespace rocksdb { +SuperVersion::SuperVersion(const int num_memtables) { + to_delete.resize(num_memtables); +} + +SuperVersion::~SuperVersion() { + for (auto td : to_delete) { + delete td; + } +} + +SuperVersion* SuperVersion::Ref() { + refs.fetch_add(1, std::memory_order_relaxed); + return this; +} + +bool SuperVersion::Unref() { + assert(refs > 0); + // fetch_sub returns the previous value of ref + return refs.fetch_sub(1, std::memory_order_relaxed) == 1; +} + +void SuperVersion::Cleanup() { + assert(refs.load(std::memory_order_relaxed) == 0); + imm->Unref(&to_delete); + MemTable* m = mem->Unref(); + if (m != nullptr) { + to_delete.push_back(m); + } + current->Unref(); +} + +void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm, + Version* new_current) { + mem = new_mem; + imm = new_imm; + current = new_current; + mem->Ref(); + imm->Ref(); + current->Ref(); + refs.store(1, std::memory_order_relaxed); +} + ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, Version* dummy_versions, const ColumnFamilyOptions& options) @@ -10,12 +66,40 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, name(name), dummy_versions(dummy_versions), current(nullptr), - options(options) {} + options(options), + mem(nullptr), + imm(options.min_write_buffer_number_to_merge), + super_version(nullptr) {} ColumnFamilyData::~ColumnFamilyData() { + if (super_version != nullptr) { + bool is_last_reference __attribute__((unused)); + is_last_reference = super_version->Unref(); + assert(is_last_reference); + super_version->Cleanup(); + delete super_version; + } // List must be empty assert(dummy_versions->next_ == dummy_versions); delete dummy_versions; + + if (mem != nullptr) { + delete mem->Unref(); + } + std::vector to_delete; + imm.current()->Unref(&to_delete); + for (MemTable* m : to_delete) { + delete m; + } +} + +void ColumnFamilyData::CreateNewMemtable() { + assert(current != nullptr); + if (mem != nullptr) { + delete mem->Unref(); + } + mem = new MemTable(current->vset_->icmp_, options); + mem->Ref(); } ColumnFamilySet::ColumnFamilySet() : max_column_family_(0) {} @@ -31,7 +115,7 @@ ColumnFamilySet::~ColumnFamilySet() { ColumnFamilyData* ColumnFamilySet::GetDefault() const { auto ret = GetColumnFamily(0); - assert(ret != nullptr); // default column family should always exist + assert(ret != nullptr); // default column family should always exist return ret; } diff --git a/db/column_family.h b/db/column_family.h index fa6902fd2..b5235d3df 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -9,16 +9,47 @@ #pragma once -#include "rocksdb/options.h" - #include #include #include +#include "rocksdb/options.h" +#include "db/memtablelist.h" + namespace rocksdb { class Version; class VersionSet; +class MemTable; +class MemTableListVersion; + +// holds references to memtable, all immutable memtables and version +struct SuperVersion { + MemTable* mem; + MemTableListVersion* imm; + Version* current; + std::atomic refs; + // We need to_delete because during Cleanup(), imm->Unref() returns + // all memtables that we need to free through this vector. We then + // delete all those memtables outside of mutex, during destruction + std::vector to_delete; + + // should be called outside the mutex + explicit SuperVersion(const int num_memtables = 0); + ~SuperVersion(); + SuperVersion* Ref(); + // Returns true if this was the last reference and caller should + // call Clenaup() and delete the object + bool Unref(); + + // call these two methods with db mutex held + // Cleanup unrefs mem, imm and current. Also, it stores all memtables + // that needs to be deleted in to_delete vector. Unrefing those + // objects needs to be done in the mutex + void Cleanup(); + void Init(MemTable* new_mem, MemTableListVersion* new_imm, + Version* new_current); +}; // column family metadata struct ColumnFamilyData { @@ -28,27 +59,34 @@ struct ColumnFamilyData { Version* current; // == dummy_versions->prev_ ColumnFamilyOptions options; + MemTable* mem; + MemTableList imm; + SuperVersion* super_version; + ColumnFamilyData(uint32_t id, const std::string& name, Version* dummy_versions, const ColumnFamilyOptions& options); ~ColumnFamilyData(); + + void CreateNewMemtable(); }; class ColumnFamilySet { public: - class iterator { - public: - explicit iterator( - std::unordered_map::iterator itr) - : itr_(itr) {} - iterator& operator++() { - ++itr_; - return *this; - } - bool operator!=(const iterator& other) { return this->itr_ != other.itr_; } - ColumnFamilyData* operator*() { return itr_->second; } - private: - std::unordered_map::iterator itr_; - }; + class iterator { + public: + explicit iterator( + std::unordered_map::iterator itr) + : itr_(itr) {} + iterator& operator++() { + ++itr_; + return *this; + } + bool operator!=(const iterator& other) { return this->itr_ != other.itr_; } + ColumnFamilyData* operator*() { return itr_->second; } + + private: + std::unordered_map::iterator itr_; + }; ColumnFamilySet(); ~ColumnFamilySet(); diff --git a/db/db_impl.cc b/db/db_impl.cc index b80ed8bd0..de4288736 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -265,11 +265,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) mutex_(options.use_adaptive_mutex), shutting_down_(nullptr), bg_cv_(&mutex_), - mem_rep_factory_(options_.memtable_factory.get()), - mem_(new MemTable(internal_comparator_, options_)), - imm_(options_.min_write_buffer_number_to_merge), logfile_number_(0), - super_version_(nullptr), super_version_number_(0), tmp_batch_(), bg_compaction_scheduled_(0), @@ -297,8 +293,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) bg_work_gate_closed_(false), refitting_level_(false) { - mem_->Ref(); - env_->GetAbsolutePath(dbname, &db_absolute_path_); stall_leveln_slowdown_.resize(options.num_levels); @@ -333,11 +327,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) } DBImpl::~DBImpl() { - std::vector to_delete; - to_delete.reserve(options_.max_write_buffer_number); - // Wait for background work to finish - if (flush_on_destroy_ && mem_->GetFirstSequenceNumber() != 0) { + if (flush_on_destroy_ && default_cfd_->mem->GetFirstSequenceNumber() != 0) { FlushMemTable(FlushOptions()); } mutex_.Lock(); @@ -347,27 +338,12 @@ DBImpl::~DBImpl() { bg_logstats_scheduled_) { bg_cv_.Wait(); } - if (super_version_ != nullptr) { - bool is_last_reference __attribute__((unused)); - is_last_reference = super_version_->Unref(); - assert(is_last_reference); - super_version_->Cleanup(); - delete super_version_; - } mutex_.Unlock(); if (db_lock_ != nullptr) { env_->UnlockFile(db_lock_); } - if (mem_ != nullptr) { - delete mem_->Unref(); - } - - imm_.current()->Unref(&to_delete); - for (MemTable* m: to_delete) { - delete m; - } LogFlush(options_.info_log); } @@ -383,13 +359,6 @@ void DBImpl::TEST_Destroy_DBImpl() { bg_logstats_scheduled_) { bg_cv_.Wait(); } - if (super_version_ != nullptr) { - bool is_last_reference __attribute__((unused)); - is_last_reference = super_version_->Unref(); - assert(is_last_reference); - super_version_->Cleanup(); - delete super_version_; - } // Prevent new compactions from occuring. bg_work_gate_closed_ = true; @@ -488,49 +457,6 @@ void DBImpl::MaybeDumpStats() { } } -// DBImpl::SuperVersion methods -DBImpl::SuperVersion::SuperVersion(const int num_memtables) { - to_delete.resize(num_memtables); -} - -DBImpl::SuperVersion::~SuperVersion() { - for (auto td : to_delete) { - delete td; - } -} - -DBImpl::SuperVersion* DBImpl::SuperVersion::Ref() { - refs.fetch_add(1, std::memory_order_relaxed); - return this; -} - -bool DBImpl::SuperVersion::Unref() { - assert(refs > 0); - // fetch_sub returns the previous value of ref - return refs.fetch_sub(1, std::memory_order_relaxed) == 1; -} - -void DBImpl::SuperVersion::Cleanup() { - assert(refs.load(std::memory_order_relaxed) == 0); - imm->Unref(&to_delete); - MemTable* m = mem->Unref(); - if (m != nullptr) { - to_delete.push_back(m); - } - current->Unref(); -} - -void DBImpl::SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm, - Version* new_current) { - mem = new_mem; - imm = new_imm; - current = new_current; - mem->Ref(); - imm->Ref(); - current->Ref(); - refs.store(1, std::memory_order_relaxed); -} - // Returns the list of live files in 'sst_live' and the list // of all files in the filesystem in 'all_files'. // no_full_scan = true -- never do the full scan using GetChildren() @@ -925,6 +851,7 @@ Status DBImpl::Recover( Status s = versions_->Recover(column_families); if (s.ok()) { SequenceNumber max_sequence(0); + default_cfd_ = versions_->GetColumnFamilySet()->GetDefault(); // Recover from all newer log files than the ones named in the // descriptor (new log files may have been added by the previous @@ -1037,7 +964,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, } WriteBatchInternal::SetContents(&batch, record); - status = WriteBatchInternal::InsertInto(&batch, mem_, &options_); + status = + WriteBatchInternal::InsertInto(&batch, default_cfd_->mem, &options_); memtable_empty = false; MaybeIgnoreError(&status); if (!status.ok()) { @@ -1050,13 +978,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, *max_sequence = last_seq; } - if (!read_only && - mem_->ApproximateMemoryUsage() > options_.write_buffer_size) { - status = WriteLevel0TableForRecovery(mem_, &edit); + if (!read_only && default_cfd_->mem->ApproximateMemoryUsage() > + options_.write_buffer_size) { + status = WriteLevel0TableForRecovery(default_cfd_->mem, &edit); // we still want to clear memtable, even if the recovery failed - delete mem_->Unref(); - mem_ = new MemTable(internal_comparator_, options_); - mem_->Ref(); + default_cfd_->CreateNewMemtable(); memtable_empty = true; if (!status.ok()) { // Reflect errors immediately so that conditions like full @@ -1067,10 +993,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, } if (!memtable_empty && !read_only) { - status = WriteLevel0TableForRecovery(mem_, &edit); - delete mem_->Unref(); - mem_ = new MemTable(internal_comparator_, options_); - mem_->Ref(); + status = WriteLevel0TableForRecovery(default_cfd_->mem, &edit); + default_cfd_->CreateNewMemtable(); if (!status.ok()) { return status; } @@ -1233,9 +1157,9 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, DeletionState& deletion_state) { mutex_.AssertHeld(); - assert(imm_.size() != 0); + assert(default_cfd_->imm.size() != 0); - if (!imm_.IsFlushPending()) { + if (!default_cfd_->imm.IsFlushPending()) { Log(options_.info_log, "FlushMemTableToOutputFile already in progress"); Status s = Status::IOError("FlushMemTableToOutputFile already in progress"); return s; @@ -1244,7 +1168,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, // Save the contents of the earliest memtable as a new Table uint64_t file_number; std::vector mems; - imm_.PickMemtablesToFlush(&mems); + default_cfd_->imm.PickMemtablesToFlush(&mems); if (mems.empty()) { Log(options_.info_log, "Nothing in memstore to flush"); Status s = Status::IOError("Nothing in memstore to flush"); @@ -1279,12 +1203,12 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, } // Replace immutable memtable with the generated Table - s = imm_.InstallMemtableFlushResults( + s = default_cfd_->imm.InstallMemtableFlushResults( mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number, pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get()); if (s.ok()) { - InstallSuperVersion(deletion_state); + InstallSuperVersion(default_cfd_, deletion_state); if (madeProgress) { *madeProgress = 1; } @@ -1410,7 +1334,7 @@ Status DBImpl::ReFitLevel(int level, int target_level) { edit.DebugString().data()); status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get()); - superversion_to_free = InstallSuperVersion(new_superversion); + superversion_to_free = InstallSuperVersion(default_cfd_, new_superversion); new_superversion = nullptr; Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data()); @@ -1737,10 +1661,10 @@ Status DBImpl::WaitForFlushMemTable() { Status s; // Wait until the compaction completes MutexLock l(&mutex_); - while (imm_.size() > 0 && bg_error_.ok()) { + while (default_cfd_->imm.size() > 0 && bg_error_.ok()) { bg_cv_.Wait(); } - if (imm_.size() != 0) { + if (default_cfd_->imm.size() != 0) { s = bg_error_; } return s; @@ -1776,7 +1700,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else { - bool is_flush_pending = imm_.IsFlushPending(); + bool is_flush_pending = default_cfd_->imm.IsFlushPending(); if (is_flush_pending && (bg_flush_scheduled_ < options_.max_background_flushes)) { // memtable flush needed @@ -1811,7 +1735,7 @@ void DBImpl::BGWorkCompaction(void* db) { Status DBImpl::BackgroundFlush(bool* madeProgress, DeletionState& deletion_state) { Status stat; - while (stat.ok() && imm_.IsFlushPending()) { + while (stat.ok() && default_cfd_->imm.IsFlushPending()) { Log(options_.info_log, "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", options_.max_background_flushes - bg_flush_scheduled_); @@ -1931,7 +1855,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, mutex_.AssertHeld(); // TODO: remove memtable flush from formal compaction - while (imm_.IsFlushPending()) { + while (default_cfd_->imm.IsFlushPending()) { Log(options_.info_log, "BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots " "available %d", @@ -1983,7 +1907,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get()); - InstallSuperVersion(deletion_state); + InstallSuperVersion(default_cfd_, deletion_state); + Version::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast(f->number), c->level() + 1, @@ -2334,11 +2259,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { // Prioritize immutable compaction work // TODO: remove memtable flush from normal compaction work - if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) { + if (default_cfd_->imm.imm_flush_needed.NoBarrier_Load() != nullptr) { const uint64_t imm_start = env_->NowMicros(); LogFlush(options_.info_log); mutex_.Lock(); - if (imm_.IsFlushPending()) { + if (default_cfd_->imm.IsFlushPending()) { FlushMemTableToOutputFile(nullptr, deletion_state); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } @@ -2649,7 +2574,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, if (status.ok()) { status = InstallCompactionResults(compact); - InstallSuperVersion(deletion_state); + InstallSuperVersion(default_cfd_, deletion_state); } Version::LevelSummaryStorage tmp; Log(options_.info_log, @@ -2716,10 +2641,9 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, // Collect together all needed child iterators for mem mutex_.Lock(); *latest_snapshot = versions_->LastSequence(); - mem_->Ref(); - mutable_mem = mem_; - // Collect together all needed child iterators for imm_ - immutable_mems = imm_.current(); + mutable_mem = default_cfd_->mem; + mutable_mem->Ref(); + immutable_mems = default_cfd_->imm.current(); immutable_mems->Ref(); versions_->current()->Ref(); version = versions_->current(); @@ -2758,9 +2682,9 @@ std::pair DBImpl::GetTailingIteratorPair( // get all child iterators and bump their refcounts under lock mutex_.Lock(); - mutable_mem = mem_; + mutable_mem = default_cfd_->mem; mutable_mem->Ref(); - immutable_mems = imm_.current(); + immutable_mems = default_cfd_->imm.current(); immutable_mems->Ref(); version = versions_->current(); version->Ref(); @@ -2823,12 +2747,13 @@ Status DBImpl::Get(const ReadOptions& options, // first call already used it. In that rare case, we take a hit and create a // new SuperVersion() inside of the mutex. We do similar thing // for superversion_to_free -void DBImpl::InstallSuperVersion(DeletionState& deletion_state) { +void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, + DeletionState& deletion_state) { // if new_superversion == nullptr, it means somebody already used it SuperVersion* new_superversion = (deletion_state.new_superversion != nullptr) ? deletion_state.new_superversion : new SuperVersion(); - SuperVersion* old_superversion = InstallSuperVersion(new_superversion); + SuperVersion* old_superversion = InstallSuperVersion(cfd, new_superversion); deletion_state.new_superversion = nullptr; if (deletion_state.superversion_to_free != nullptr) { // somebody already put it there @@ -2838,13 +2763,16 @@ void DBImpl::InstallSuperVersion(DeletionState& deletion_state) { } } -DBImpl::SuperVersion* DBImpl::InstallSuperVersion( - SuperVersion* new_superversion) { +SuperVersion* DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, + SuperVersion* new_superversion) { mutex_.AssertHeld(); - new_superversion->Init(mem_, imm_.current(), versions_->current()); - SuperVersion* old_superversion = super_version_; - super_version_ = new_superversion; - ++super_version_number_; + new_superversion->Init(cfd->mem, cfd->imm.current(), cfd->current); + SuperVersion* old_superversion = cfd->super_version; + cfd->super_version = new_superversion; + if (cfd->id == 0) { + // TODO this is only for default column family for now + ++super_version_number_; + } if (old_superversion != nullptr && old_superversion->Unref()) { old_superversion->Cleanup(); return old_superversion; // will let caller delete outside of mutex @@ -2868,7 +2796,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, // This can be replaced by using atomics and spinlock instead of big mutex mutex_.Lock(); - SuperVersion* get_version = super_version_->Ref(); + SuperVersion* get_version = default_cfd_->super_version->Ref(); mutex_.Unlock(); bool have_stat_update = false; @@ -2939,9 +2867,10 @@ std::vector DBImpl::MultiGet( snapshot = versions_->LastSequence(); } - MemTable* mem = mem_; - MemTableListVersion* imm = imm_.current(); - Version* current = versions_->current(); + // TODO only works for default column family + MemTable* mem = default_cfd_->mem; + MemTableListVersion* imm = default_cfd_->imm.current(); + Version* current = default_cfd_->current; mem->Ref(); imm->Ref(); current->Ref(); @@ -3012,12 +2941,12 @@ std::vector DBImpl::MultiGet( Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle* handle) { + MutexLock l(&mutex_); if (versions_->GetColumnFamilySet()->Exists(column_family_name)) { return Status::InvalidArgument("Column family already exists"); } VersionEdit edit; edit.AddColumnFamily(column_family_name); - MutexLock l(&mutex_); handle->id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); edit.SetColumnFamily(handle->id); Status s = versions_->LogAndApply(&edit, &mutex_); @@ -3170,7 +3099,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes - // into mem_. + // into default_cfd_->mem. { mutex_.Unlock(); WriteBatch* updates = nullptr; @@ -3216,7 +3145,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } } if (status.ok()) { - status = WriteBatchInternal::InsertInto(updates, mem_, &options_, this, + status = WriteBatchInternal::InsertInto(updates, default_cfd_->mem, + &options_, this, options_.filter_deletes); if (!status.ok()) { // Panic for in-memory corruptions @@ -3382,14 +3312,15 @@ Status DBImpl::MakeRoomForWrite(bool force, allow_delay = false; // Do not delay a single write more than once mutex_.Lock(); delayed_writes_++; - } else if (!force && - (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { + } else if (!force && (default_cfd_->mem->ApproximateMemoryUsage() <= + options_.write_buffer_size)) { // There is room in current memtable if (allow_delay) { DelayLoggingAndReset(); } break; - } else if (imm_.size() == options_.max_write_buffer_number - 1) { + } else if (default_cfd_->imm.size() == + options_.max_write_buffer_number - 1) { // We have filled up the current memtable, but the previous // ones are still being compacted, so we wait. DelayLoggingAndReset(); @@ -3498,20 +3429,21 @@ Status DBImpl::MakeRoomForWrite(bool force, } logfile_number_ = new_log_number; log_.reset(new log::Writer(std::move(lfile))); - mem_->SetNextLogNumber(logfile_number_); - imm_.Add(mem_); + default_cfd_->mem->SetNextLogNumber(logfile_number_); + default_cfd_->imm.Add(default_cfd_->mem); if (force) { - imm_.FlushRequested(); + default_cfd_->imm.FlushRequested(); } - mem_ = memtmp; - mem_->Ref(); + default_cfd_->mem = memtmp; + default_cfd_->mem->Ref(); Log(options_.info_log, "New memtable created with log file: #%lu\n", (unsigned long)logfile_number_); - mem_->SetLogNumber(logfile_number_); + default_cfd_->mem->SetLogNumber(logfile_number_); force = false; // Do not force another compaction if have room MaybeScheduleFlushOrCompaction(); - *superversion_to_free = InstallSuperVersion(new_superversion); + *superversion_to_free = + InstallSuperVersion(default_cfd_, new_superversion); } } return s; @@ -3802,7 +3734,7 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, *value = versions_->current()->DebugString(); return true; } else if (in == "num-immutable-mem-table") { - *value = std::to_string(imm_.size()); + *value = std::to_string(default_cfd_->imm.size()); return true; } @@ -3900,7 +3832,7 @@ Status DBImpl::DeleteFile(std::string name) { edit.DeleteFile(level, number); status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get()); if (status.ok()) { - InstallSuperVersion(deletion_state); + InstallSuperVersion(default_cfd_, deletion_state); } FindObsoleteFiles(deletion_state, false); } // lock released here @@ -4028,7 +3960,8 @@ Status DB::OpenWithColumnFamilies( return s; } impl->mutex_.Lock(); - s = impl->Recover(column_families); // Handles create_if_missing, error_if_exists + // Handles create_if_missing, error_if_exists + s = impl->Recover(column_families); if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); unique_ptr lfile; @@ -4061,8 +3994,10 @@ Status DB::OpenWithColumnFamilies( } } if (s.ok()) { - delete impl->InstallSuperVersion(new DBImpl::SuperVersion()); - impl->mem_->SetLogNumber(impl->logfile_number_); + for (auto cfd : *impl->versions_->GetColumnFamilySet()) { + delete impl->InstallSuperVersion(cfd, new SuperVersion()); + cfd->mem->SetLogNumber(impl->logfile_number_); + } impl->DeleteObsoleteFiles(); impl->MaybeScheduleFlushOrCompaction(); impl->MaybeScheduleLogDBDeployStats(); diff --git a/db/db_impl.h b/db/db_impl.h index 24ef16e13..046cb28c7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -15,6 +15,7 @@ #include "db/dbformat.h" #include "db/log_writer.h" #include "db/snapshot.h" +#include "db/column_family.h" #include "db/version_edit.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -174,34 +175,6 @@ class DBImpl : public DB { default_interval_to_delete_obsolete_WAL_ = default_interval_to_delete_obsolete_WAL; } - // holds references to memtable, all immutable memtables and version - struct SuperVersion { - MemTable* mem; - MemTableListVersion* imm; - Version* current; - std::atomic refs; - // We need to_delete because during Cleanup(), imm->Unref() returns - // all memtables that we need to free through this vector. We then - // delete all those memtables outside of mutex, during destruction - std::vector to_delete; - - // should be called outside the mutex - explicit SuperVersion(const int num_memtables = 0); - ~SuperVersion(); - SuperVersion* Ref(); - // Returns true if this was the last reference and caller should - // call Clenaup() and delete the object - bool Unref(); - - // call these two methods with db mutex held - // Cleanup unrefs mem, imm and current. Also, it stores all memtables - // that needs to be deleted in to_delete vector. Unrefing those - // objects needs to be done in the mutex - void Cleanup(); - void Init(MemTable* new_mem, MemTableListVersion* new_imm, - Version* new_current); - }; - // needed for CleanupIteratorState struct DeletionState { inline bool HaveSomethingToDelete() const { @@ -286,7 +259,8 @@ class DBImpl : public DB { } MemTable* GetMemTable() { - return mem_; + // TODO currently only works for default column family + return default_cfd_->mem; } Iterator* NewInternalIterator(const ReadOptions&, @@ -425,13 +399,9 @@ class DBImpl : public DB { port::Mutex mutex_; port::AtomicPointer shutting_down_; port::CondVar bg_cv_; // Signalled when background work finishes - MemTableRepFactory* mem_rep_factory_; - MemTable* mem_; - MemTableList imm_; // Memtable that are not changing uint64_t logfile_number_; unique_ptr log_; - - SuperVersion* super_version_; + ColumnFamilyData* default_cfd_; // An ordinal representing the current SuperVersion. Updated by // InstallSuperVersion(), i.e. incremented every time super_version_ @@ -625,11 +595,13 @@ class DBImpl : public DB { // Foreground threads call this function directly (they don't carry // deletion state and have to handle their own creation and deletion // of SuperVersion) - SuperVersion* InstallSuperVersion(SuperVersion* new_superversion); + SuperVersion* InstallSuperVersion(ColumnFamilyData* cfd, + SuperVersion* new_superversion); // Background threads call this function, which is just a wrapper around // the InstallSuperVersion() function above. Background threads carry // deletion_state which can have new_superversion already allocated. - void InstallSuperVersion(DeletionState& deletion_state); + void InstallSuperVersion(ColumnFamilyData* cfd, + DeletionState& deletion_state); // Function that Get and KeyMayExist call with no_io true or false // Note: 'value_found' from KeyMayExist propagates here diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index c94440170..1810a9620 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -90,7 +90,8 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname, std::vector column_families; column_families.push_back( ColumnFamilyDescriptor(default_column_family_name, cf_options)); - Status s = impl->Recover(column_families, true /* read only */, error_if_log_file_exist); + Status s = impl->Recover(column_families, true /* read only */, + error_if_log_file_exist); impl->mutex_.Unlock(); if (s.ok()) { *dbptr = impl; diff --git a/db/memtable.cc b/db/memtable.cc index bf2dfa64b..03f1ffac6 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -33,7 +33,8 @@ struct hash { namespace rocksdb { -MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options) +MemTable::MemTable(const InternalKeyComparator& cmp, + const ColumnFamilyOptions& options) : comparator_(cmp), refs_(0), arena_impl_(options.arena_block_size), diff --git a/db/memtable.h b/db/memtable.h index 1b9005800..415c7070b 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -13,7 +13,7 @@ #include #include "db/dbformat.h" #include "db/skiplist.h" -#include "db/version_set.h" +#include "db/version_edit.h" #include "rocksdb/db.h" #include "rocksdb/memtablerep.h" #include "util/arena_impl.h" @@ -35,7 +35,7 @@ class MemTable { // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. explicit MemTable(const InternalKeyComparator& comparator, - const Options& options = Options()); + const ColumnFamilyOptions& options); ~MemTable(); diff --git a/db/memtablelist.cc b/db/memtablelist.cc index c93b58b6e..9bd69b1af 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -8,6 +8,7 @@ #include #include "rocksdb/db.h" #include "db/memtable.h" +#include "db/version_set.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "util/coding.h" diff --git a/db/memtablelist.h b/db/memtablelist.h index b2cd84103..57c414929 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -13,7 +13,7 @@ #include "rocksdb/iterator.h" #include "db/dbformat.h" #include "db/skiplist.h" -#include "memtable.h" +#include "db/memtable.h" namespace rocksdb { diff --git a/db/version_set.cc b/db/version_set.cc index 8ccbcf946..9e64759ee 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1392,6 +1392,9 @@ VersionSet::~VersionSet() { for (auto cfd : *column_family_set_) { cfd->current->Unref(); } + // we need to delete column_family_set_ because its destructor depends on + // VersionSet + column_family_set_.reset(); for (auto file : obsolete_files_) { delete file; } @@ -2401,6 +2404,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( edit->column_family_name_, edit->column_family_, dummy_versions, options); AppendVersion(new_cfd, new Version(this, current_version_number_++)); + new_cfd->CreateNewMemtable(); return new_cfd; } diff --git a/db/version_set.h b/db/version_set.h index 879a34701..2a9bf6d91 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -446,6 +446,8 @@ class VersionSet { friend class Compaction; friend class Version; + // TODO temporarily until we have what ColumnFamilyData needs (icmp_) + friend struct ColumnFamilyData; struct LogReporter : public log::Reader::Reporter { Status* status; diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 396e3ea6e..62e197706 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -24,7 +24,7 @@ static std::string PrintContents(WriteBatch* b) { auto factory = std::make_shared(); Options options; options.memtable_factory = factory; - MemTable* mem = new MemTable(cmp, options); + MemTable* mem = new MemTable(cmp, ColumnFamilyOptions(options)); mem->Ref(); std::string state; Status s = WriteBatchInternal::InsertInto(b, mem, &options); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index ecf0e3632..871cf015a 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -315,7 +315,8 @@ class DB { int target_level = -1) = 0; Status CompactRange(const Slice* begin, const Slice* end, bool reduce_level = false, int target_level = -1) { - return CompactRange(default_column_family, begin, end, reduce_level, target_level); + return CompactRange(default_column_family, begin, end, reduce_level, + target_level); } // Number of levels used for this DB. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 00acafb0a..e75421d08 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -357,6 +357,21 @@ struct ColumnFamilyOptions { // order. int table_cache_remove_scan_count_limit; + // size of one block in arena memory allocation. + // If <= 0, a proper value is automatically calculated (usually 1/10 of + // writer_buffer_size). + // + // There are two additonal restriction of the The specified size: + // (1) size should be in the range of [4096, 2 << 30] and + // (2) be the multiple of the CPU word (which helps with the memory + // alignment). + // + // We'll automatically check and adjust the size number to make sure it + // conforms to the restrictions. + // + // Default: 0 + size_t arena_block_size; + // Disable automatic compactions. Manual compactions can still // be issued on this column family bool disable_auto_compactions; @@ -562,21 +577,6 @@ struct DBOptions { // The default value is MAX_INT so that roll-over does not take place. uint64_t max_manifest_file_size; - // size of one block in arena memory allocation. - // If <= 0, a proper value is automatically calculated (usually 1/10 of - // writer_buffer_size). - // - // There are two additonal restriction of the The specified size: - // (1) size should be in the range of [4096, 2 << 30] and - // (2) be the multiple of the CPU word (which helps with the memory - // alignment). - // - // We'll automatically check and adjust the size number to make sure it - // conforms to the restrictions. - // - // Default: 0 - size_t arena_block_size; - // The following two fields affect how archived logs will be deleted. // 1. If both set to 0, logs will be deleted asap and will not get into // the archive. diff --git a/table/table_test.cc b/table/table_test.cc index af794fd13..aabe2f424 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -371,7 +371,8 @@ class MemTableConstructor: public Constructor { table_factory_(new SkipListFactory) { Options options; options.memtable_factory = table_factory_; - memtable_ = new MemTable(internal_comparator_, options); + memtable_ = + new MemTable(internal_comparator_, ColumnFamilyOptions(options)); memtable_->Ref(); } ~MemTableConstructor() { diff --git a/util/options.cc b/util/options.cc index e4a14c1da..f2e11c0d1 100644 --- a/util/options.cc +++ b/util/options.cc @@ -63,6 +63,7 @@ ColumnFamilyOptions::ColumnFamilyOptions() no_block_cache(false), table_cache_numshardbits(4), table_cache_remove_scan_count_limit(16), + arena_block_size(0), disable_auto_compactions(false), purge_redundant_kvs_while_flush(true), block_size_deviation(10), @@ -121,6 +122,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) table_cache_numshardbits(options.table_cache_numshardbits), table_cache_remove_scan_count_limit( options.table_cache_remove_scan_count_limit), + arena_block_size(options.arena_block_size), disable_auto_compactions(options.disable_auto_compactions), purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush), block_size_deviation(options.block_size_deviation), @@ -157,7 +159,6 @@ DBOptions::DBOptions() log_file_time_to_roll(0), keep_log_file_num(1000), max_manifest_file_size(std::numeric_limits::max()), - arena_block_size(0), WAL_ttl_seconds(0), WAL_size_limit_MB(0), manifest_preallocation_size(4 * 1024 * 1024), @@ -193,7 +194,6 @@ DBOptions::DBOptions(const Options& options) log_file_time_to_roll(options.log_file_time_to_roll), keep_log_file_num(options.keep_log_file_num), max_manifest_file_size(options.max_manifest_file_size), - arena_block_size(options.arena_block_size), WAL_ttl_seconds(options.WAL_ttl_seconds), WAL_size_limit_MB(options.WAL_size_limit_MB), manifest_preallocation_size(options.manifest_preallocation_size),