From c583157d497b02577dd8c08b54ab91821df1ea21 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 24 Jan 2014 14:52:08 -0800 Subject: [PATCH] MemTableListVersion Summary: MemTableListVersion is to MemTableList what Version is to VersionSet. I took almost the same ideas to develop MemTableListVersion. The reason is to have copying std::list done in background, while flushing, rather than in foreground (MultiGet() and NewIterator()) under a mutex! Also, whenever we copied MemTableList, we copied also some MemTableList metadata (flush_requested_, commit_in_progress_, etc.), which was wasteful. This diff avoids std::list copy under a mutex in both MultiGet() and NewIterator(). I created a small database with some number of immutable memtables, and creating 100.000 iterators in a single-thread (!) decreased from {188739, 215703, 198028} to {154352, 164035, 159817}. A lot of the savings come from code under a mutex, so we should see much higher savings with multiple threads. Creating new iterator is very important to LogDevice team. I also think this diff will make SuperVersion obsolete for performance reasons. I will try it in the next diff. SuperVersion gave us huge savings on Get() code path, but I think that most of the savings came from copying MemTableList under a mutex. If we had MemTableListVersion, we would never need to copy the entire object (like we still do in NewIterator() and MultiGet()) Test Plan: `make check` works. I will also do `make valgrind_check` before commit Reviewers: dhruba, haobo, kailiu, sdong, emayanke, tnovak Reviewed By: kailiu CC: leveldb Differential Revision: https://reviews.facebook.net/D15255 --- db/db_impl.cc | 94 +++++++++++++++----------------- db/db_impl.h | 6 +-- db/memtablelist.cc | 132 +++++++++++++++++++++++++++++---------------- db/memtablelist.h | 81 ++++++++++++++++++---------- 4 files changed, 184 insertions(+), 129 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 48d0e3451..aff55c75d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -264,6 +264,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) bg_cv_(&mutex_), mem_rep_factory_(options_.memtable_factory.get()), mem_(new MemTable(internal_comparator_, options_)), + imm_(options_.min_write_buffer_number_to_merge), logfile_number_(0), super_version_(nullptr), super_version_number_(0), @@ -360,7 +361,7 @@ DBImpl::~DBImpl() { delete mem_->Unref(); } - imm_.UnrefAll(&to_delete); + imm_.current()->Unref(&to_delete); for (MemTable* m: to_delete) { delete m; } @@ -508,7 +509,7 @@ bool DBImpl::SuperVersion::Unref() { void DBImpl::SuperVersion::Cleanup() { assert(refs.load(std::memory_order_relaxed) == 0); - imm.UnrefAll(&to_delete); + imm->Unref(&to_delete); MemTable* m = mem->Unref(); if (m != nullptr) { to_delete.push_back(m); @@ -516,13 +517,13 @@ void DBImpl::SuperVersion::Cleanup() { current->Unref(); } -void DBImpl::SuperVersion::Init(MemTable* new_mem, const MemTableList& new_imm, +void DBImpl::SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm, Version* new_current) { mem = new_mem; imm = new_imm; current = new_current; mem->Ref(); - imm.RefAll(); + imm->Ref(); current->Ref(); refs.store(1, std::memory_order_relaxed); } @@ -1221,7 +1222,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, mutex_.AssertHeld(); assert(imm_.size() != 0); - if (!imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { + if (!imm_.IsFlushPending()) { Log(options_.info_log, "FlushMemTableToOutputFile already in progress"); Status s = Status::IOError("FlushMemTableToOutputFile already in progress"); return s; @@ -1762,8 +1763,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else { - bool is_flush_pending = - imm_.IsFlushPending(options_.min_write_buffer_number_to_merge); + bool is_flush_pending = imm_.IsFlushPending(); if (is_flush_pending && (bg_flush_scheduled_ < options_.max_background_flushes)) { // memtable flush needed @@ -1798,8 +1798,7 @@ void DBImpl::BGWorkCompaction(void* db) { Status DBImpl::BackgroundFlush(bool* madeProgress, DeletionState& deletion_state) { Status stat; - while (stat.ok() && - imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { + while (stat.ok() && imm_.IsFlushPending()) { Log(options_.info_log, "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", options_.max_background_flushes - bg_flush_scheduled_); @@ -1919,7 +1918,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, mutex_.AssertHeld(); // TODO: remove memtable flush from formal compaction - while (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { + while (imm_.IsFlushPending()) { Log(options_.info_log, "BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots " "available %d", @@ -2325,7 +2324,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, const uint64_t imm_start = env_->NowMicros(); LogFlush(options_.info_log); mutex_.Lock(); - if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { + if (imm_.IsFlushPending()) { FlushMemTableToOutputFile(nullptr, deletion_state); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } @@ -2658,8 +2657,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, namespace { struct IterState { port::Mutex* mu; - Version* version; - std::vector mem; // includes both mem_ and imm_ + Version* version = nullptr; + MemTable* mem = nullptr; + MemTableListVersion* imm = nullptr; DBImpl *db; }; @@ -2668,15 +2668,16 @@ static void CleanupIteratorState(void* arg1, void* arg2) { DBImpl::DeletionState deletion_state(state->db->GetOptions(). max_write_buffer_number); state->mu->Lock(); - for (unsigned int i = 0; i < state->mem.size(); i++) { - MemTable* m = state->mem[i]->Unref(); - if (m != nullptr) { - deletion_state.memtables_to_free.push_back(m); - } + MemTable* m = state->mem->Unref(); + if (m != nullptr) { + deletion_state.memtables_to_free.push_back(m); } if (state->version) { // not set for memtable-only iterator state->version->Unref(); } + if (state->imm) { // not set for memtable-only iterator + state->imm->Unref(&deletion_state.memtables_to_free); + } // fast path FindObsoleteFiles state->db->FindObsoleteFiles(deletion_state, false, true); state->mu->Unlock(); @@ -2690,7 +2691,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, SequenceNumber* latest_snapshot) { IterState* cleanup = new IterState; MemTable* mutable_mem; - std::vector immutables; + MemTableListVersion* immutable_mems; Version* version; // Collect together all needed child iterators for mem @@ -2699,27 +2700,22 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, mem_->Ref(); mutable_mem = mem_; // Collect together all needed child iterators for imm_ - imm_.GetMemTables(&immutables); - for (unsigned int i = 0; i < immutables.size(); i++) { - immutables[i]->Ref(); - } + immutable_mems = imm_.current(); + immutable_mems->Ref(); versions_->current()->Ref(); version = versions_->current(); mutex_.Unlock(); - std::vector list; - list.push_back(mutable_mem->NewIterator(options)); - cleanup->mem.push_back(mutable_mem); - + std::vector iterator_list; + iterator_list.push_back(mutable_mem->NewIterator(options)); + cleanup->mem = mutable_mem; + cleanup->imm = immutable_mems; // Collect all needed child iterators for immutable memtables - for (MemTable* m : immutables) { - list.push_back(m->NewIterator(options)); - cleanup->mem.push_back(m); - } + immutable_mems->AddIterators(options, &iterator_list); // Collect iterators for files in L0 - Ln - version->AddIterators(options, storage_options_, &list); - Iterator* internal_iter = - NewMergingIterator(&internal_comparator_, &list[0], list.size()); + version->AddIterators(options, storage_options_, &iterator_list); + Iterator* internal_iter = NewMergingIterator( + &internal_comparator_, &iterator_list[0], iterator_list.size()); cleanup->version = version; cleanup->mu = &mutex_; cleanup->db = this; @@ -2738,19 +2734,15 @@ std::pair DBImpl::GetTailingIteratorPair( uint64_t* superversion_number) { MemTable* mutable_mem; - std::vector immutables; + MemTableListVersion* immutable_mems; Version* version; - immutables.reserve(options_.max_write_buffer_number); - // get all child iterators and bump their refcounts under lock mutex_.Lock(); mutable_mem = mem_; mutable_mem->Ref(); - imm_.GetMemTables(&immutables); - for (size_t i = 0; i < immutables.size(); ++i) { - immutables[i]->Ref(); - } + immutable_mems = imm_.current(); + immutable_mems->Ref(); version = versions_->current(); version->Ref(); if (superversion_number != nullptr) { @@ -2760,7 +2752,7 @@ std::pair DBImpl::GetTailingIteratorPair( Iterator* mutable_iter = mutable_mem->NewIterator(options); IterState* mutable_cleanup = new IterState(); - mutable_cleanup->mem.push_back(mutable_mem); + mutable_cleanup->mem = mutable_mem; mutable_cleanup->db = this; mutable_cleanup->mu = &mutex_; mutable_iter->RegisterCleanup(CleanupIteratorState, mutable_cleanup, nullptr); @@ -2772,10 +2764,8 @@ std::pair DBImpl::GetTailingIteratorPair( Iterator* immutable_iter; IterState* immutable_cleanup = new IterState(); std::vector list; - for (MemTable* m : immutables) { - list.push_back(m->NewIterator(options)); - immutable_cleanup->mem.push_back(m); - } + immutable_mems->AddIterators(options, &list); + immutable_cleanup->imm = immutable_mems; version->AddIterators(options, storage_options_, &list); immutable_cleanup->version = version; immutable_cleanup->db = this; @@ -2832,7 +2822,7 @@ void DBImpl::InstallSuperVersion(DeletionState& deletion_state) { DBImpl::SuperVersion* DBImpl::InstallSuperVersion( SuperVersion* new_superversion) { mutex_.AssertHeld(); - new_superversion->Init(mem_, imm_, versions_->current()); + new_superversion->Init(mem_, imm_.current(), versions_->current()); SuperVersion* old_superversion = super_version_; super_version_ = new_superversion; ++super_version_number_; @@ -2875,7 +2865,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); - } else if (get_version->imm.Get(lkey, value, &s, merge_context, options_)) { + } else if (get_version->imm->Get(lkey, value, &s, merge_context, options_)) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); } else { @@ -2930,10 +2920,10 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, } MemTable* mem = mem_; - MemTableList imm = imm_; + MemTableListVersion* imm = imm_.current(); Version* current = versions_->current(); mem->Ref(); - imm.RefAll(); + imm->Ref(); current->Ref(); // Unlock while reading from files and memtables @@ -2965,7 +2955,7 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, LookupKey lkey(keys[i], snapshot); if (mem->Get(lkey, value, &s, merge_context, options_)) { // Done - } else if (imm.Get(lkey, value, &s, merge_context, options_)) { + } else if (imm->Get(lkey, value, &s, merge_context, options_)) { // Done } else { current->Get(options, lkey, value, &s, &merge_context, &stats, options_); @@ -2984,7 +2974,7 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, MaybeScheduleFlushOrCompaction(); } MemTable* m = mem->Unref(); - imm.UnrefAll(&to_delete); + imm->Unref(&to_delete); current->Unref(); mutex_.Unlock(); diff --git a/db/db_impl.h b/db/db_impl.h index cee574648..abefcba61 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -140,10 +140,10 @@ class DBImpl : public DB { // holds references to memtable, all immutable memtables and version struct SuperVersion { MemTable* mem; - MemTableList imm; + MemTableListVersion* imm; Version* current; std::atomic refs; - // We need to_delete because during Cleanup(), imm.UnrefAll() returns + // We need to_delete because during Cleanup(), imm->Unref() returns // all memtables that we need to free through this vector. We then // delete all those memtables outside of mutex, during destruction std::vector to_delete; @@ -161,7 +161,7 @@ class DBImpl : public DB { // that needs to be deleted in to_delete vector. Unrefing those // objects needs to be done in the mutex void Cleanup(); - void Init(MemTable* new_mem, const MemTableList& new_imm, + void Init(MemTable* new_mem, MemTableListVersion* new_imm, Version* new_current); }; diff --git a/db/memtablelist.cc b/db/memtablelist.cc index 71e4e5a92..b52563ae8 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -16,41 +16,85 @@ namespace rocksdb { class InternalKeyComparator; class Mutex; -class MemTableListIterator; class VersionSet; -using std::list; - -// Increase reference count on all underling memtables -void MemTableList::RefAll() { - for (auto &memtable : memlist_) { - memtable->Ref(); +MemTableListVersion::MemTableListVersion(MemTableListVersion* old) { + if (old != nullptr) { + memlist_ = old->memlist_; + size_ = old->size_; + for (auto& m : memlist_) { + m->Ref(); + } } } -// Drop reference count on all underling memtables. If the -// refcount of an underlying memtable drops to zero, then -// return it in to_delete vector. -void MemTableList::UnrefAll(std::vector* to_delete) { - for (auto &memtable : memlist_) { - MemTable* m = memtable->Unref(); - if (m != nullptr) { - to_delete->push_back(m); +void MemTableListVersion::Ref() { ++refs_; } + +void MemTableListVersion::Unref(std::vector* to_delete) { + --refs_; + if (refs_ == 0) { + // if to_delete is equal to nullptr it means we're confident + // that refs_ will not be zero + assert(to_delete != nullptr); + for (const auto& m : memlist_) { + MemTable* x = m->Unref(); + if (x != nullptr) { + to_delete->push_back(x); + } } + delete this; } } +int MemTableListVersion::size() const { return size_; } + // Returns the total number of memtables in the list -int MemTableList::size() { - assert(num_flush_not_started_ <= size_); - return size_; +int MemTableList::size() const { + assert(num_flush_not_started_ <= current_->size_); + return current_->size_; +} + +// Search all the memtables starting from the most recent one. +// Return the most recent value found, if any. +// Operands stores the list of merge operations to apply, so far. +bool MemTableListVersion::Get(const LookupKey& key, std::string* value, + Status* s, MergeContext& merge_context, + const Options& options) { + for (auto& memtable : memlist_) { + if (memtable->Get(key, value, s, merge_context, options)) { + return true; + } + } + return false; +} + +void MemTableListVersion::AddIterators(const ReadOptions& options, + std::vector* iterator_list) { + for (auto& m : memlist_) { + iterator_list->push_back(m->NewIterator(options)); + } +} + +void MemTableListVersion::Add(MemTable* m) { + assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable + m->Ref(); + memlist_.push_front(m); + ++size_; +} + +void MemTableListVersion::Remove(MemTable* m) { + assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable + MemTable* x __attribute__((unused)) = m->Unref(); + assert(x == nullptr); // it still needs to be alive! + memlist_.remove(m); + --size_; } // Returns true if there is at least one memtable on which flush has // not yet started. -bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) { +bool MemTableList::IsFlushPending() { if ((flush_requested_ && num_flush_not_started_ >= 1) || - (num_flush_not_started_ >= min_write_buffer_number_to_merge)) { + (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) { assert(imm_flush_needed.NoBarrier_Load() != nullptr); return true; } @@ -59,7 +103,8 @@ bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) { // Returns the memtables that need to be flushed. void MemTableList::PickMemtablesToFlush(std::vector* ret) { - for (auto it = memlist_.rbegin(); it != memlist_.rend(); it++) { + const auto& memlist = current_->memlist_; + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { MemTable* m = *it; if (!m->flush_in_progress_) { assert(!m->flush_completed_); @@ -122,8 +167,8 @@ Status MemTableList::InstallMemtableFlushResults( // scan all memtables from the earliest, and commit those // (in that order) that have finished flushing. Memetables // are always committed in the order that they were created. - while (!memlist_.empty() && s.ok()) { - MemTable* m = memlist_.back(); // get the last element + while (!current_->memlist_.empty() && s.ok()) { + MemTable* m = current_->memlist_.back(); // get the last element if (!m->flush_completed_) { break; } @@ -135,6 +180,10 @@ Status MemTableList::InstallMemtableFlushResults( // this can release and reacquire the mutex. s = vset->LogAndApply(&m->edit_, mu); + // we will be changing the version in the next code path, + // so we better create a new one, since versions are immutable + InstallNewVersion(); + // All the later memtables that have the same filenum // are part of the same batch. They can be committed now. uint64_t mem_id = 1; // how many memtables has been flushed. @@ -144,7 +193,7 @@ Status MemTableList::InstallMemtableFlushResults( "Level-0 commit table #%lu: memtable #%lu done", (unsigned long)m->file_number_, (unsigned long)mem_id); - memlist_.remove(m); + current_->Remove(m); assert(m->file_number_ > 0); // pending_outputs can be cleared only after the newly created file @@ -155,7 +204,6 @@ Status MemTableList::InstallMemtableFlushResults( if (m->Unref() != nullptr) { to_delete->push_back(m); } - size_--; } else { //commit failed. setup state so that we can flush again. Log(info_log, @@ -172,7 +220,7 @@ Status MemTableList::InstallMemtableFlushResults( s = Status::IOError("Unable to commit flushed memtable"); } ++mem_id; - } while (!memlist_.empty() && (m = memlist_.back()) && + } while (!current_->memlist_.empty() && (m = current_->memlist_.back()) && m->file_number_ == file_number); } commit_in_progress_ = false; @@ -181,9 +229,9 @@ Status MemTableList::InstallMemtableFlushResults( // New memtables are inserted at the front of the list. void MemTableList::Add(MemTable* m) { - assert(size_ >= num_flush_not_started_); - size_++; - memlist_.push_front(m); + assert(current_->size_ >= num_flush_not_started_); + InstallNewVersion(); + current_->Add(m); m->MarkImmutable(); num_flush_not_started_++; if (num_flush_not_started_ == 1) { @@ -194,28 +242,20 @@ void MemTableList::Add(MemTable* m) { // Returns an estimate of the number of bytes of data in use. size_t MemTableList::ApproximateMemoryUsage() { size_t size = 0; - for (auto &memtable : memlist_) { + for (auto& memtable : current_->memlist_) { size += memtable->ApproximateMemoryUsage(); } return size; } -// Search all the memtables starting from the most recent one. -// Return the most recent value found, if any. -// Operands stores the list of merge operations to apply, so far. -bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s, - MergeContext& merge_context, const Options& options) { - for (auto &memtable : memlist_) { - if (memtable->Get(key, value, s, merge_context, options)) { - return true; - } - } - return false; -} - -void MemTableList::GetMemTables(std::vector* output) { - for (auto &memtable : memlist_) { - output->push_back(memtable); +void MemTableList::InstallNewVersion() { + if (current_->refs_ == 1) { + // we're the only one using the version, just keep using it + } else { + // somebody else holds the current version, we need to create new one + MemTableListVersion* version = current_; + current_ = new MemTableListVersion(current_); + version->Unref(); } } diff --git a/db/memtablelist.h b/db/memtablelist.h index ed353c8b8..354e9872a 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -7,8 +7,10 @@ #pragma once #include #include -#include +#include #include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/iterator.h" #include "db/dbformat.h" #include "db/skiplist.h" #include "memtable.h" @@ -17,44 +19,71 @@ namespace rocksdb { class InternalKeyComparator; class Mutex; -class MemTableListIterator; -// +// keeps a list of immutable memtables in a vector. the list is immutable +// if refcount is bigger than one. It is used as a state for Get() and +// Iterator code paths +class MemTableListVersion { + public: + explicit MemTableListVersion(MemTableListVersion* old = nullptr); + + void Ref(); + void Unref(std::vector* to_delete = nullptr); + + int size() const; + + // Search all the memtables starting from the most recent one. + // Return the most recent value found, if any. + bool Get(const LookupKey& key, std::string* value, Status* s, + MergeContext& merge_context, const Options& options); + + void AddIterators(const ReadOptions& options, + std::vector* iterator_list); + + // REQUIRE: m is mutable memtable + void Add(MemTable* m); + // REQUIRE: m is mutable memtable + void Remove(MemTable* m); + + private: + friend class MemTableList; + std::list memlist_; + int size_ = 0; + int refs_ = 1; +}; + // This class stores references to all the immutable memtables. // The memtables are flushed to L0 as soon as possible and in // any order. If there are more than one immutable memtable, their // flushes can occur concurrently. However, they are 'committed' // to the manifest in FIFO order to maintain correctness and // recoverability from a crash. -// class MemTableList { public: // A list of memtables. - MemTableList() : size_(0), num_flush_not_started_(0), - commit_in_progress_(false), - flush_requested_(false) { + explicit MemTableList(int min_write_buffer_number_to_merge) + : min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge), + current_(new MemTableListVersion()), + num_flush_not_started_(0), + commit_in_progress_(false), + flush_requested_(false) { imm_flush_needed.Release_Store(nullptr); + current_->Ref(); } - ~MemTableList() {}; + ~MemTableList() {} + + MemTableListVersion* current() { return current_; } // so that backgrund threads can detect non-nullptr pointer to // determine whether this is anything more to start flushing. port::AtomicPointer imm_flush_needed; - // Increase reference count on all underling memtables - void RefAll(); - - // Drop reference count on all underling memtables. If the refcount - // on an underlying memtable drops to zero, then return it in - // to_delete vector. - void UnrefAll(std::vector* to_delete); - // Returns the total number of memtables in the list - int size(); + int size() const; // Returns true if there is at least one memtable on which flush has // not yet started. - bool IsFlushPending(int min_write_buffer_number_to_merge); + bool IsFlushPending(); // Returns the earliest memtables that needs to be flushed. The returned // memtables are guaranteed to be in the ascending order of created time. @@ -75,14 +104,6 @@ class MemTableList { // Returns an estimate of the number of bytes of data in use. size_t ApproximateMemoryUsage(); - // Search all the memtables starting from the most recent one. - // Return the most recent value found, if any. - bool Get(const LookupKey& key, std::string* value, Status* s, - MergeContext& merge_context, const Options& options); - - // Returns the list of underlying memtables. - void GetMemTables(std::vector* list); - // Request a flush of all existing memtables to storage void FlushRequested() { flush_requested_ = true; } @@ -91,8 +112,12 @@ class MemTableList { // void operator=(const MemTableList&); private: - std::list memlist_; - int size_; + // DB mutex held + void InstallNewVersion(); + + int min_write_buffer_number_to_merge_; + + MemTableListVersion* current_; // the number of elements that still need flushing int num_flush_not_started_;