diff --git a/Makefile b/Makefile index ab13ac0d5..01dc3721d 100644 --- a/Makefile +++ b/Makefile @@ -136,13 +136,12 @@ endif # PLATFORM_SHARED_EXT all: $(LIBRARY) $(PROGRAMS) .PHONY: blackbox_crash_test check clean coverage crash_test ldb_tests \ - release tags valgrind_check whitebox_crash_test format + release tags valgrind_check whitebox_crash_test format shared_lib # Will also generate shared libraries. release: $(MAKE) clean OPT=-DNDEBUG $(MAKE) all -j32 - OPT=-DNDEBUG $(MAKE) $(SHARED) -j32 coverage: $(MAKE) clean @@ -202,6 +201,8 @@ tags: format: build_tools/format-diff.sh +shared_lib: $(SHARED) + # --------------------------------------------------------------------------- # Unit tests and tools # --------------------------------------------------------------------------- diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index cfa3770d7..284942117 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -22,6 +22,21 @@ uint64_t TotalFileSize(const std::vector& files) { return sum; } +// Multiple two operands. If they overflow, return op1. +uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) { + if (op1 == 0) { + return 0; + } + if (op2 <= 0) { + return op1; + } + uint64_t casted_op2 = (uint64_t) op2; + if (std::numeric_limits::max() / op1 < casted_op2) { + return op1; + } + return op1 * casted_op2; +} + } // anonymous namespace CompactionPicker::CompactionPicker(const Options* options, @@ -30,15 +45,7 @@ CompactionPicker::CompactionPicker(const Options* options, options_(options), num_levels_(options->num_levels), icmp_(icmp) { - Init(); -} - -void CompactionPicker::ReduceNumberOfLevels(int new_levels) { - num_levels_ = new_levels; - Init(); -} -void CompactionPicker::Init() { max_file_size_.reset(new uint64_t[NumberLevels()]); level_max_bytes_.reset(new uint64_t[NumberLevels()]); int target_file_size_multiplier = options_->target_file_size_multiplier; @@ -48,10 +55,11 @@ void CompactionPicker::Init() { max_file_size_[i] = ULLONG_MAX; level_max_bytes_[i] = options_->max_bytes_for_level_base; } else if (i > 1) { - max_file_size_[i] = max_file_size_[i - 1] * target_file_size_multiplier; - level_max_bytes_[i] = - level_max_bytes_[i - 1] * max_bytes_multiplier * - options_->max_bytes_for_level_multiplier_additional[i - 1]; + max_file_size_[i] = MultiplyCheckOverflow(max_file_size_[i - 1], + target_file_size_multiplier); + level_max_bytes_[i] = MultiplyCheckOverflow( + MultiplyCheckOverflow(level_max_bytes_[i - 1], max_bytes_multiplier), + options_->max_bytes_for_level_multiplier_additional[i - 1]); } else { max_file_size_[i] = options_->target_file_size_base; level_max_bytes_[i] = options_->max_bytes_for_level_base; diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 0fe086a18..ee77cc4c7 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -27,9 +27,6 @@ class CompactionPicker { CompactionPicker(const Options* options, const InternalKeyComparator* icmp); virtual ~CompactionPicker(); - // See VersionSet::ReduceNumberOfLevels() - void ReduceNumberOfLevels(int new_levels); - // Pick level and inputs for a new compaction. // Returns nullptr if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that @@ -120,8 +117,6 @@ class CompactionPicker { const Options* const options_; private: - void Init(); - int num_levels_; const InternalKeyComparator* const icmp_; diff --git a/db/db_impl.cc b/db/db_impl.cc index eeb49e086..698338760 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -267,6 +267,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) bg_cv_(&mutex_), mem_rep_factory_(options_.memtable_factory.get()), mem_(new MemTable(internal_comparator_, options_)), + imm_(options_.min_write_buffer_number_to_merge), logfile_number_(0), super_version_(nullptr), super_version_number_(0), @@ -363,7 +364,7 @@ DBImpl::~DBImpl() { delete mem_->Unref(); } - imm_.UnrefAll(&to_delete); + imm_.current()->Unref(&to_delete); for (MemTable* m: to_delete) { delete m; } @@ -511,7 +512,7 @@ bool DBImpl::SuperVersion::Unref() { void DBImpl::SuperVersion::Cleanup() { assert(refs.load(std::memory_order_relaxed) == 0); - imm.UnrefAll(&to_delete); + imm->Unref(&to_delete); MemTable* m = mem->Unref(); if (m != nullptr) { to_delete.push_back(m); @@ -519,13 +520,13 @@ void DBImpl::SuperVersion::Cleanup() { current->Unref(); } -void DBImpl::SuperVersion::Init(MemTable* new_mem, const MemTableList& new_imm, +void DBImpl::SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm, Version* new_current) { mem = new_mem; imm = new_imm; current = new_current; mem->Ref(); - imm.RefAll(); + imm->Ref(); current->Ref(); refs.store(1, std::memory_order_relaxed); } @@ -1226,7 +1227,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, mutex_.AssertHeld(); assert(imm_.size() != 0); - if (!imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { + if (!imm_.IsFlushPending()) { Log(options_.info_log, "FlushMemTableToOutputFile already in progress"); Status s = Status::IOError("FlushMemTableToOutputFile already in progress"); return s; @@ -1767,8 +1768,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else { - bool is_flush_pending = - imm_.IsFlushPending(options_.min_write_buffer_number_to_merge); + bool is_flush_pending = imm_.IsFlushPending(); if (is_flush_pending && (bg_flush_scheduled_ < options_.max_background_flushes)) { // memtable flush needed @@ -1803,8 +1803,7 @@ void DBImpl::BGWorkCompaction(void* db) { Status DBImpl::BackgroundFlush(bool* madeProgress, DeletionState& deletion_state) { Status stat; - while (stat.ok() && - imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { + while (stat.ok() && imm_.IsFlushPending()) { Log(options_.info_log, "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", options_.max_background_flushes - bg_flush_scheduled_); @@ -1924,7 +1923,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, mutex_.AssertHeld(); // TODO: remove memtable flush from formal compaction - while (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { + while (imm_.IsFlushPending()) { Log(options_.info_log, "BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots " "available %d", @@ -2330,7 +2329,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, const uint64_t imm_start = env_->NowMicros(); LogFlush(options_.info_log); mutex_.Lock(); - if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { + if (imm_.IsFlushPending()) { FlushMemTableToOutputFile(nullptr, deletion_state); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } @@ -2663,8 +2662,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, namespace { struct IterState { port::Mutex* mu; - Version* version; - std::vector mem; // includes both mem_ and imm_ + Version* version = nullptr; + MemTable* mem = nullptr; + MemTableListVersion* imm = nullptr; DBImpl *db; }; @@ -2673,15 +2673,16 @@ static void CleanupIteratorState(void* arg1, void* arg2) { DBImpl::DeletionState deletion_state(state->db->GetOptions(). max_write_buffer_number); state->mu->Lock(); - for (unsigned int i = 0; i < state->mem.size(); i++) { - MemTable* m = state->mem[i]->Unref(); - if (m != nullptr) { - deletion_state.memtables_to_free.push_back(m); - } + MemTable* m = state->mem->Unref(); + if (m != nullptr) { + deletion_state.memtables_to_free.push_back(m); } if (state->version) { // not set for memtable-only iterator state->version->Unref(); } + if (state->imm) { // not set for memtable-only iterator + state->imm->Unref(&deletion_state.memtables_to_free); + } // fast path FindObsoleteFiles state->db->FindObsoleteFiles(deletion_state, false, true); state->mu->Unlock(); @@ -2695,7 +2696,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, SequenceNumber* latest_snapshot) { IterState* cleanup = new IterState; MemTable* mutable_mem; - std::vector immutables; + MemTableListVersion* immutable_mems; Version* version; // Collect together all needed child iterators for mem @@ -2704,27 +2705,22 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, mem_->Ref(); mutable_mem = mem_; // Collect together all needed child iterators for imm_ - imm_.GetMemTables(&immutables); - for (unsigned int i = 0; i < immutables.size(); i++) { - immutables[i]->Ref(); - } + immutable_mems = imm_.current(); + immutable_mems->Ref(); versions_->current()->Ref(); version = versions_->current(); mutex_.Unlock(); - std::vector list; - list.push_back(mutable_mem->NewIterator(options)); - cleanup->mem.push_back(mutable_mem); - + std::vector iterator_list; + iterator_list.push_back(mutable_mem->NewIterator(options)); + cleanup->mem = mutable_mem; + cleanup->imm = immutable_mems; // Collect all needed child iterators for immutable memtables - for (MemTable* m : immutables) { - list.push_back(m->NewIterator(options)); - cleanup->mem.push_back(m); - } + immutable_mems->AddIterators(options, &iterator_list); // Collect iterators for files in L0 - Ln - version->AddIterators(options, storage_options_, &list); - Iterator* internal_iter = - NewMergingIterator(&internal_comparator_, &list[0], list.size()); + version->AddIterators(options, storage_options_, &iterator_list); + Iterator* internal_iter = NewMergingIterator( + &internal_comparator_, &iterator_list[0], iterator_list.size()); cleanup->version = version; cleanup->mu = &mutex_; cleanup->db = this; @@ -2743,19 +2739,15 @@ std::pair DBImpl::GetTailingIteratorPair( uint64_t* superversion_number) { MemTable* mutable_mem; - std::vector immutables; + MemTableListVersion* immutable_mems; Version* version; - immutables.reserve(options_.max_write_buffer_number); - // get all child iterators and bump their refcounts under lock mutex_.Lock(); mutable_mem = mem_; mutable_mem->Ref(); - imm_.GetMemTables(&immutables); - for (size_t i = 0; i < immutables.size(); ++i) { - immutables[i]->Ref(); - } + immutable_mems = imm_.current(); + immutable_mems->Ref(); version = versions_->current(); version->Ref(); if (superversion_number != nullptr) { @@ -2765,7 +2757,7 @@ std::pair DBImpl::GetTailingIteratorPair( Iterator* mutable_iter = mutable_mem->NewIterator(options); IterState* mutable_cleanup = new IterState(); - mutable_cleanup->mem.push_back(mutable_mem); + mutable_cleanup->mem = mutable_mem; mutable_cleanup->db = this; mutable_cleanup->mu = &mutex_; mutable_iter->RegisterCleanup(CleanupIteratorState, mutable_cleanup, nullptr); @@ -2777,10 +2769,8 @@ std::pair DBImpl::GetTailingIteratorPair( Iterator* immutable_iter; IterState* immutable_cleanup = new IterState(); std::vector list; - for (MemTable* m : immutables) { - list.push_back(m->NewIterator(options)); - immutable_cleanup->mem.push_back(m); - } + immutable_mems->AddIterators(options, &list); + immutable_cleanup->imm = immutable_mems; version->AddIterators(options, storage_options_, &list); immutable_cleanup->version = version; immutable_cleanup->db = this; @@ -2837,7 +2827,7 @@ void DBImpl::InstallSuperVersion(DeletionState& deletion_state) { DBImpl::SuperVersion* DBImpl::InstallSuperVersion( SuperVersion* new_superversion) { mutex_.AssertHeld(); - new_superversion->Init(mem_, imm_, versions_->current()); + new_superversion->Init(mem_, imm_.current(), versions_->current()); SuperVersion* old_superversion = super_version_; super_version_ = new_superversion; ++super_version_number_; @@ -2880,7 +2870,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); - } else if (get_version->imm.Get(lkey, value, &s, merge_context, options_)) { + } else if (get_version->imm->Get(lkey, value, &s, merge_context, options_)) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); } else { @@ -2936,10 +2926,10 @@ std::vector DBImpl::MultiGet( } MemTable* mem = mem_; - MemTableList imm = imm_; + MemTableListVersion* imm = imm_.current(); Version* current = versions_->current(); mem->Ref(); - imm.RefAll(); + imm->Ref(); current->Ref(); // Unlock while reading from files and memtables @@ -2971,7 +2961,7 @@ std::vector DBImpl::MultiGet( LookupKey lkey(keys[i], snapshot); if (mem->Get(lkey, value, &s, merge_context, options_)) { // Done - } else if (imm.Get(lkey, value, &s, merge_context, options_)) { + } else if (imm->Get(lkey, value, &s, merge_context, options_)) { // Done } else { current->Get(options, lkey, value, &s, &merge_context, &stats, options_); @@ -2990,7 +2980,7 @@ std::vector DBImpl::MultiGet( MaybeScheduleFlushOrCompaction(); } MemTable* m = mem->Unref(); - imm.UnrefAll(&to_delete); + imm->Unref(&to_delete); current->Unref(); mutex_.Unlock(); diff --git a/db/db_impl.h b/db/db_impl.h index d75181537..18e186ca0 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -177,10 +177,10 @@ class DBImpl : public DB { // holds references to memtable, all immutable memtables and version struct SuperVersion { MemTable* mem; - MemTableList imm; + MemTableListVersion* imm; Version* current; std::atomic refs; - // We need to_delete because during Cleanup(), imm.UnrefAll() returns + // We need to_delete because during Cleanup(), imm->Unref() returns // all memtables that we need to free through this vector. We then // delete all those memtables outside of mutex, during destruction std::vector to_delete; @@ -198,7 +198,7 @@ class DBImpl : public DB { // that needs to be deleted in to_delete vector. Unrefing those // objects needs to be done in the mutex void Cleanup(); - void Init(MemTable* new_mem, const MemTableList& new_imm, + void Init(MemTable* new_mem, MemTableListVersion* new_imm, Version* new_current); }; diff --git a/db/db_test.cc b/db/db_test.cc index 26b0ec5a7..4fa5452c2 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -17,6 +17,7 @@ #include "db/filename.h" #include "db/version_set.h" #include "db/write_batch_internal.h" +#include "table/block_based_table_factory.h" #include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" @@ -735,6 +736,9 @@ TEST(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) { options.filter_policy = filter_policy.get(); options.create_if_missing = true; options.statistics = rocksdb::CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); DestroyAndReopen(&options); ASSERT_OK(db_->Put(WriteOptions(), "key", "val")); diff --git a/db/memtablelist.cc b/db/memtablelist.cc index 71e4e5a92..b52563ae8 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -16,41 +16,85 @@ namespace rocksdb { class InternalKeyComparator; class Mutex; -class MemTableListIterator; class VersionSet; -using std::list; - -// Increase reference count on all underling memtables -void MemTableList::RefAll() { - for (auto &memtable : memlist_) { - memtable->Ref(); +MemTableListVersion::MemTableListVersion(MemTableListVersion* old) { + if (old != nullptr) { + memlist_ = old->memlist_; + size_ = old->size_; + for (auto& m : memlist_) { + m->Ref(); + } } } -// Drop reference count on all underling memtables. If the -// refcount of an underlying memtable drops to zero, then -// return it in to_delete vector. -void MemTableList::UnrefAll(std::vector* to_delete) { - for (auto &memtable : memlist_) { - MemTable* m = memtable->Unref(); - if (m != nullptr) { - to_delete->push_back(m); +void MemTableListVersion::Ref() { ++refs_; } + +void MemTableListVersion::Unref(std::vector* to_delete) { + --refs_; + if (refs_ == 0) { + // if to_delete is equal to nullptr it means we're confident + // that refs_ will not be zero + assert(to_delete != nullptr); + for (const auto& m : memlist_) { + MemTable* x = m->Unref(); + if (x != nullptr) { + to_delete->push_back(x); + } } + delete this; } } +int MemTableListVersion::size() const { return size_; } + // Returns the total number of memtables in the list -int MemTableList::size() { - assert(num_flush_not_started_ <= size_); - return size_; +int MemTableList::size() const { + assert(num_flush_not_started_ <= current_->size_); + return current_->size_; +} + +// Search all the memtables starting from the most recent one. +// Return the most recent value found, if any. +// Operands stores the list of merge operations to apply, so far. +bool MemTableListVersion::Get(const LookupKey& key, std::string* value, + Status* s, MergeContext& merge_context, + const Options& options) { + for (auto& memtable : memlist_) { + if (memtable->Get(key, value, s, merge_context, options)) { + return true; + } + } + return false; +} + +void MemTableListVersion::AddIterators(const ReadOptions& options, + std::vector* iterator_list) { + for (auto& m : memlist_) { + iterator_list->push_back(m->NewIterator(options)); + } +} + +void MemTableListVersion::Add(MemTable* m) { + assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable + m->Ref(); + memlist_.push_front(m); + ++size_; +} + +void MemTableListVersion::Remove(MemTable* m) { + assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable + MemTable* x __attribute__((unused)) = m->Unref(); + assert(x == nullptr); // it still needs to be alive! + memlist_.remove(m); + --size_; } // Returns true if there is at least one memtable on which flush has // not yet started. -bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) { +bool MemTableList::IsFlushPending() { if ((flush_requested_ && num_flush_not_started_ >= 1) || - (num_flush_not_started_ >= min_write_buffer_number_to_merge)) { + (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) { assert(imm_flush_needed.NoBarrier_Load() != nullptr); return true; } @@ -59,7 +103,8 @@ bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) { // Returns the memtables that need to be flushed. void MemTableList::PickMemtablesToFlush(std::vector* ret) { - for (auto it = memlist_.rbegin(); it != memlist_.rend(); it++) { + const auto& memlist = current_->memlist_; + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { MemTable* m = *it; if (!m->flush_in_progress_) { assert(!m->flush_completed_); @@ -122,8 +167,8 @@ Status MemTableList::InstallMemtableFlushResults( // scan all memtables from the earliest, and commit those // (in that order) that have finished flushing. Memetables // are always committed in the order that they were created. - while (!memlist_.empty() && s.ok()) { - MemTable* m = memlist_.back(); // get the last element + while (!current_->memlist_.empty() && s.ok()) { + MemTable* m = current_->memlist_.back(); // get the last element if (!m->flush_completed_) { break; } @@ -135,6 +180,10 @@ Status MemTableList::InstallMemtableFlushResults( // this can release and reacquire the mutex. s = vset->LogAndApply(&m->edit_, mu); + // we will be changing the version in the next code path, + // so we better create a new one, since versions are immutable + InstallNewVersion(); + // All the later memtables that have the same filenum // are part of the same batch. They can be committed now. uint64_t mem_id = 1; // how many memtables has been flushed. @@ -144,7 +193,7 @@ Status MemTableList::InstallMemtableFlushResults( "Level-0 commit table #%lu: memtable #%lu done", (unsigned long)m->file_number_, (unsigned long)mem_id); - memlist_.remove(m); + current_->Remove(m); assert(m->file_number_ > 0); // pending_outputs can be cleared only after the newly created file @@ -155,7 +204,6 @@ Status MemTableList::InstallMemtableFlushResults( if (m->Unref() != nullptr) { to_delete->push_back(m); } - size_--; } else { //commit failed. setup state so that we can flush again. Log(info_log, @@ -172,7 +220,7 @@ Status MemTableList::InstallMemtableFlushResults( s = Status::IOError("Unable to commit flushed memtable"); } ++mem_id; - } while (!memlist_.empty() && (m = memlist_.back()) && + } while (!current_->memlist_.empty() && (m = current_->memlist_.back()) && m->file_number_ == file_number); } commit_in_progress_ = false; @@ -181,9 +229,9 @@ Status MemTableList::InstallMemtableFlushResults( // New memtables are inserted at the front of the list. void MemTableList::Add(MemTable* m) { - assert(size_ >= num_flush_not_started_); - size_++; - memlist_.push_front(m); + assert(current_->size_ >= num_flush_not_started_); + InstallNewVersion(); + current_->Add(m); m->MarkImmutable(); num_flush_not_started_++; if (num_flush_not_started_ == 1) { @@ -194,28 +242,20 @@ void MemTableList::Add(MemTable* m) { // Returns an estimate of the number of bytes of data in use. size_t MemTableList::ApproximateMemoryUsage() { size_t size = 0; - for (auto &memtable : memlist_) { + for (auto& memtable : current_->memlist_) { size += memtable->ApproximateMemoryUsage(); } return size; } -// Search all the memtables starting from the most recent one. -// Return the most recent value found, if any. -// Operands stores the list of merge operations to apply, so far. -bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s, - MergeContext& merge_context, const Options& options) { - for (auto &memtable : memlist_) { - if (memtable->Get(key, value, s, merge_context, options)) { - return true; - } - } - return false; -} - -void MemTableList::GetMemTables(std::vector* output) { - for (auto &memtable : memlist_) { - output->push_back(memtable); +void MemTableList::InstallNewVersion() { + if (current_->refs_ == 1) { + // we're the only one using the version, just keep using it + } else { + // somebody else holds the current version, we need to create new one + MemTableListVersion* version = current_; + current_ = new MemTableListVersion(current_); + version->Unref(); } } diff --git a/db/memtablelist.h b/db/memtablelist.h index ed353c8b8..354e9872a 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -7,8 +7,10 @@ #pragma once #include #include -#include +#include #include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/iterator.h" #include "db/dbformat.h" #include "db/skiplist.h" #include "memtable.h" @@ -17,44 +19,71 @@ namespace rocksdb { class InternalKeyComparator; class Mutex; -class MemTableListIterator; -// +// keeps a list of immutable memtables in a vector. the list is immutable +// if refcount is bigger than one. It is used as a state for Get() and +// Iterator code paths +class MemTableListVersion { + public: + explicit MemTableListVersion(MemTableListVersion* old = nullptr); + + void Ref(); + void Unref(std::vector* to_delete = nullptr); + + int size() const; + + // Search all the memtables starting from the most recent one. + // Return the most recent value found, if any. + bool Get(const LookupKey& key, std::string* value, Status* s, + MergeContext& merge_context, const Options& options); + + void AddIterators(const ReadOptions& options, + std::vector* iterator_list); + + // REQUIRE: m is mutable memtable + void Add(MemTable* m); + // REQUIRE: m is mutable memtable + void Remove(MemTable* m); + + private: + friend class MemTableList; + std::list memlist_; + int size_ = 0; + int refs_ = 1; +}; + // This class stores references to all the immutable memtables. // The memtables are flushed to L0 as soon as possible and in // any order. If there are more than one immutable memtable, their // flushes can occur concurrently. However, they are 'committed' // to the manifest in FIFO order to maintain correctness and // recoverability from a crash. -// class MemTableList { public: // A list of memtables. - MemTableList() : size_(0), num_flush_not_started_(0), - commit_in_progress_(false), - flush_requested_(false) { + explicit MemTableList(int min_write_buffer_number_to_merge) + : min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge), + current_(new MemTableListVersion()), + num_flush_not_started_(0), + commit_in_progress_(false), + flush_requested_(false) { imm_flush_needed.Release_Store(nullptr); + current_->Ref(); } - ~MemTableList() {}; + ~MemTableList() {} + + MemTableListVersion* current() { return current_; } // so that backgrund threads can detect non-nullptr pointer to // determine whether this is anything more to start flushing. port::AtomicPointer imm_flush_needed; - // Increase reference count on all underling memtables - void RefAll(); - - // Drop reference count on all underling memtables. If the refcount - // on an underlying memtable drops to zero, then return it in - // to_delete vector. - void UnrefAll(std::vector* to_delete); - // Returns the total number of memtables in the list - int size(); + int size() const; // Returns true if there is at least one memtable on which flush has // not yet started. - bool IsFlushPending(int min_write_buffer_number_to_merge); + bool IsFlushPending(); // Returns the earliest memtables that needs to be flushed. The returned // memtables are guaranteed to be in the ascending order of created time. @@ -75,14 +104,6 @@ class MemTableList { // Returns an estimate of the number of bytes of data in use. size_t ApproximateMemoryUsage(); - // Search all the memtables starting from the most recent one. - // Return the most recent value found, if any. - bool Get(const LookupKey& key, std::string* value, Status* s, - MergeContext& merge_context, const Options& options); - - // Returns the list of underlying memtables. - void GetMemTables(std::vector* list); - // Request a flush of all existing memtables to storage void FlushRequested() { flush_requested_ = true; } @@ -91,8 +112,12 @@ class MemTableList { // void operator=(const MemTableList&); private: - std::list memlist_; - int size_; + // DB mutex held + void InstallNewVersion(); + + int min_write_buffer_number_to_merge_; + + MemTableListVersion* current_; // the number of elements that still need flushing int num_flush_not_started_; diff --git a/db/version_set.cc b/db/version_set.cc index 39533c8f3..b8f87dc11 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1874,6 +1874,77 @@ Status VersionSet::ListColumnFamilies(std::vector* column_families, return s; } +Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, + const Options* options, + const EnvOptions& storage_options, + int new_levels) { + if (new_levels <= 1) { + return Status::InvalidArgument( + "Number of levels needs to be bigger than 1"); + } + + const InternalKeyComparator cmp(options->comparator); + TableCache tc(dbname, options, storage_options, 10); + VersionSet versions(dbname, options, storage_options, &tc, &cmp); + Status status; + + std::vector dummy; + dummy.push_back(ColumnFamilyDescriptor()); + status = versions.Recover(dummy); + if (!status.ok()) { + return status; + } + + Version* current_version = versions.current(); + int current_levels = current_version->NumberLevels(); + + if (current_levels <= new_levels) { + return Status::OK(); + } + + // Make sure there are file only on one level from + // (new_levels-1) to (current_levels-1) + int first_nonempty_level = -1; + int first_nonempty_level_filenum = 0; + for (int i = new_levels - 1; i < current_levels; i++) { + int file_num = current_version->NumLevelFiles(i); + if (file_num != 0) { + if (first_nonempty_level < 0) { + first_nonempty_level = i; + first_nonempty_level_filenum = file_num; + } else { + char msg[255]; + snprintf(msg, sizeof(msg), + "Found at least two levels containing files: " + "[%d:%d],[%d:%d].\n", + first_nonempty_level, first_nonempty_level_filenum, i, + file_num); + return Status::InvalidArgument(msg); + } + } + } + + std::vector* old_files_list = current_version->files_; + std::vector* new_files_list = + new std::vector[new_levels]; + for (int i = 0; i < new_levels - 1; i++) { + new_files_list[i] = old_files_list[i]; + } + + if (first_nonempty_level > 0) { + new_files_list[new_levels - 1] = old_files_list[first_nonempty_level]; + } + + delete[] current_version->files_; + current_version->files_ = new_files_list; + current_version->num_levels_ = new_levels; + + VersionEdit ve; + port::Mutex dummy_mutex; + MutexLock l(&dummy_mutex); + return versions.LogAndApply(&ve, &dummy_mutex, true); +} + Status VersionSet::DumpManifest(Options& options, std::string& dscname, bool verbose, bool hex) { // Open the specified manifest file. diff --git a/db/version_set.h b/db/version_set.h index 5ea1320cc..7043cda93 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -302,10 +302,16 @@ class VersionSet { // Try to reduce the number of levels. This call is valid when // only one level from the new max level to the old // max level containing files. + // The call is static, since number of levels is immutable during + // the lifetime of a RocksDB instance. It reduces number of levels + // in a DB by applying changes to manifest. // For example, a db currently has 7 levels [0-6], and a call to // to reduce to 5 [0-4] can only be executed when only one level // among [4-6] contains files. - Status ReduceNumberOfLevels(int new_levels, port::Mutex* mu); + static Status ReduceNumberOfLevels(const std::string& dbname, + const Options* options, + const EnvOptions& storage_options, + int new_levels); // Return the current version. Version* current() const { diff --git a/db/version_set_reduce_num_levels.cc b/db/version_set_reduce_num_levels.cc deleted file mode 100644 index 707719f90..000000000 --- a/db/version_set_reduce_num_levels.cc +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright (c) 2013, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. -// -// Copyright (c) 2012 Facebook. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "db/version_set.h" - -#include -#include -#include "db/log_reader.h" -#include "db/log_writer.h" -#include "util/logging.h" - -namespace rocksdb { - -Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) { - - if(new_levels <= 1) { - return Status::InvalidArgument( - "Number of levels needs to be bigger than 1"); - } - - // TODO this only works for default column family now - Version* current_version = column_family_set_->GetDefault()->current; - int current_levels = current_version->NumberLevels(); - - if (current_levels <= new_levels) { - return Status::OK(); - } - - // Make sure there are file only on one level from - // (new_levels-1) to (current_levels-1) - int first_nonempty_level = -1; - int first_nonempty_level_filenum = 0; - for (int i = new_levels - 1; i < current_levels; i++) { - int file_num = current_version->NumLevelFiles(i); - if (file_num != 0) { - if (first_nonempty_level < 0) { - first_nonempty_level = i; - first_nonempty_level_filenum = file_num; - } else { - char msg[255]; - sprintf(msg, "Found at least two levels containing files: " - "[%d:%d],[%d:%d].\n", - first_nonempty_level, first_nonempty_level_filenum, i, file_num); - return Status::InvalidArgument(msg); - } - } - } - - Status st; - std::vector* old_files_list = current_version->files_; - std::vector* new_files_list = - new std::vector[new_levels]; - for (int i = 0; i < new_levels - 1; i++) { - new_files_list[i] = old_files_list[i]; - } - - if (first_nonempty_level > 0) { - new_files_list[new_levels - 1] = old_files_list[first_nonempty_level]; - } - - delete[] current_version->files_; - current_version->files_ = new_files_list; - current_version->num_levels_ = new_levels; - - num_levels_ = new_levels; - compaction_picker_->ReduceNumberOfLevels(new_levels); - VersionEdit ve; - st = LogAndApply(&ve, mu, true); - return st; -} - -} diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 3e0e5c1cd..7d58e1546 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -104,6 +104,15 @@ class Cache { // returns the maximum configured capacity of the cache virtual size_t GetCapacity() = 0; + // Call this on shutdown if you want to speed it up. Cache will disown + // any underlying data and will not free it on delete. This call will leak + // memory - call this only if you're shutting down the process. + // Any attempts of using cache after this call will fail terribly. + // Always delete the DB object before calling this method! + virtual void DisownData() { + // default implementation is noop + }; + private: void LRU_Remove(Handle* e); void LRU_Append(Handle* e); diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 836f6edf6..a9cd35a68 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -20,10 +20,10 @@ namespace rocksdb { Status BlockBasedTableFactory::GetTableReader( const Options& options, const EnvOptions& soptions, - unique_ptr && file, uint64_t file_size, + unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader) const { - return BlockBasedTable::Open(options, soptions, std::move(file), file_size, - table_reader); + return BlockBasedTable::Open(options, soptions, table_options_, + std::move(file), file_size, table_reader); } TableBuilder* BlockBasedTableFactory::GetTableBuilder( diff --git a/table/block_based_table_factory.h b/table/block_based_table_factory.h index ee525816f..5a4d1bd6e 100644 --- a/table/block_based_table_factory.h +++ b/table/block_based_table_factory.h @@ -14,6 +14,7 @@ #include "rocksdb/flush_block_policy.h" #include "rocksdb/options.h" #include "rocksdb/table.h" +#include "table/block_based_table_options.h" namespace rocksdb { @@ -30,40 +31,25 @@ class BlockBasedTable; class BlockBasedTableBuilder; class BlockBasedTableFactory: public TableFactory { -public: - struct TableOptions { - // @flush_block_policy_factory creates the instances of flush block policy. - // which provides a configurable way to determine when to flush a block in - // the block based tables. If not set, table builder will use the default - // block flush policy, which cut blocks by block size (please refer to - // `FlushBlockBySizePolicy`). - std::shared_ptr flush_block_policy_factory; - }; + public: + BlockBasedTableFactory() : BlockBasedTableFactory(BlockBasedTableOptions()) {} + explicit BlockBasedTableFactory(const BlockBasedTableOptions& table_options) + : table_options_(table_options) {} - BlockBasedTableFactory() : BlockBasedTableFactory(TableOptions()) { } - BlockBasedTableFactory(const TableOptions& table_options): - table_options_(table_options) { - } + ~BlockBasedTableFactory() {} - ~BlockBasedTableFactory() { - } - - const char* Name() const override { - return "BlockBasedTable"; - } + const char* Name() const override { return "BlockBasedTable"; } Status GetTableReader(const Options& options, const EnvOptions& soptions, - unique_ptr && file, - uint64_t file_size, + unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader) const override; TableBuilder* GetTableBuilder(const Options& options, WritableFile* file, - CompressionType compression_type) const - override; + CompressionType compression_type) + const override; private: - TableOptions table_options_; + BlockBasedTableOptions table_options_; }; - } // namespace rocksdb diff --git a/table/block_based_table_options.h b/table/block_based_table_options.h new file mode 100644 index 000000000..f5774e2bf --- /dev/null +++ b/table/block_based_table_options.h @@ -0,0 +1,31 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#include + +namespace rocksdb { + +class FlushBlockPolicyFactory; + +struct BlockBasedTableOptions { + // @flush_block_policy_factory creates the instances of flush block policy. + // which provides a configurable way to determine when to flush a block in + // the block based tables. If not set, table builder will use the default + // block flush policy, which cut blocks by block size (please refer to + // `FlushBlockBySizePolicy`). + std::shared_ptr flush_block_policy_factory; + + // TODO(kailiu) Temporarily disable this feature by making the default value + // to be false. Also in master branch, this file is non-public so no user + // will be able to change the value of `cache_index_and_filter_blocks`. + // + // Indicating if we'd put index/filter blocks to the block cache. + // If not specified, each "table reader" object will pre-load index/filter + // block during table initialization. + bool cache_index_and_filter_blocks = false; +}; + +} // namespace rocksdb diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index dcb55fc36..b08ea1934 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -26,6 +26,7 @@ #include "util/coding.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" +#include "table/block_based_table_options.h" namespace rocksdb { @@ -45,9 +46,9 @@ struct BlockBasedTable::Rep { Status status; unique_ptr file; char cache_key_prefix[kMaxCacheKeyPrefixSize]; - size_t cache_key_prefix_size; + size_t cache_key_prefix_size = 0; char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize]; - size_t compressed_cache_key_prefix_size; + size_t compressed_cache_key_prefix_size = 0; // Handle to metaindex_block: saved from footer BlockHandle metaindex_handle; @@ -220,20 +221,21 @@ Cache::Handle* GetFromBlockCache( } // end of anonymous namespace -Status BlockBasedTable::Open(const Options& options, - const EnvOptions& soptions, - unique_ptr && file, - uint64_t size, +Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, + const BlockBasedTableOptions& table_options, + unique_ptr&& file, + uint64_t file_size, unique_ptr* table_reader) { table_reader->reset(); - if (size < Footer::kEncodedLength) { + + if (file_size < Footer::kEncodedLength) { return Status::InvalidArgument("file is too short to be an sstable"); } char footer_space[Footer::kEncodedLength]; Slice footer_input; - Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength, - &footer_input, footer_space); + Status s = file->Read(file_size - Footer::kEncodedLength, + Footer::kEncodedLength, &footer_input, footer_space); if (!s.ok()) return s; // Check that we actually read the whole footer from the file. It may be @@ -277,11 +279,21 @@ Status BlockBasedTable::Open(const Options& options, } } - // Initialize index/filter blocks. If block cache is not specified, - // these blocks will be kept in member variables in Rep, which will - // reside in the memory as long as this table object is alive; otherwise - // they will be added to block cache. - if (!options.block_cache) { + // Will use block cache for index/filter blocks access? + if (options.block_cache && table_options.cache_index_and_filter_blocks) { + // Call IndexBlockReader() to implicitly add index to the block_cache + unique_ptr iter(new_table->IndexBlockReader(ReadOptions())); + s = iter->status(); + + if (s.ok()) { + // Call GetFilter() to implicitly add filter to the block_cache + auto filter_entry = new_table->GetFilter(); + filter_entry.Release(options.block_cache.get()); + } + } else { + // If we don't use block cache for index/filter blocks access, we'll + // pre-load these blocks, which will kept in member variables in Rep + // and with a same life-time as this table object. Block* index_block = nullptr; // TODO: we never really verify check sum for index block s = ReadBlockFromFile( @@ -309,18 +321,7 @@ Status BlockBasedTable::Open(const Options& options, } else { delete index_block; } - } else { - // Call IndexBlockReader() to implicitly add index to the block_cache - unique_ptr iter( - new_table->IndexBlockReader(ReadOptions()) - ); - s = iter->status(); - if (s.ok()) { - // Call GetFilter() to implicitly add filter to the block_cache - auto filter_entry = new_table->GetFilter(); - filter_entry.Release(options.block_cache.get()); - } } if (s.ok()) { @@ -836,7 +837,6 @@ BlockBasedTable::GetFilter(bool no_io) const { // Get the iterator from the index block. Iterator* BlockBasedTable::IndexBlockReader(const ReadOptions& options) const { if (rep_->index_block) { - assert (!rep_->options.block_cache); return rep_->index_block->NewIterator(rep_->options.comparator); } diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 66f63fc59..52ece7441 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -29,6 +29,7 @@ struct ReadOptions; class TableCache; class TableReader; class FilterBlockReader; +struct BlockBasedTableOptions; using std::unique_ptr; @@ -50,10 +51,9 @@ class BlockBasedTable : public TableReader { // to nullptr and returns a non-ok status. // // *file must remain live while this Table is in use. - static Status Open(const Options& options, - const EnvOptions& soptions, - unique_ptr&& file, - uint64_t file_size, + static Status Open(const Options& db_options, const EnvOptions& env_options, + const BlockBasedTableOptions& table_options, + unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader); bool PrefixMayMatch(const Slice& internal_prefix) override; diff --git a/table/table_test.cc b/table/table_test.cc index 9907550ce..af794fd13 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -243,13 +243,12 @@ class BlockConstructor: public Constructor { class BlockBasedTableConstructor: public Constructor { public: - explicit BlockBasedTableConstructor( - const Comparator* cmp) - : Constructor(cmp) { - } + explicit BlockBasedTableConstructor(const Comparator* cmp) + : Constructor(cmp) {} ~BlockBasedTableConstructor() { Reset(); } + virtual Status FinishImpl(const Options& options, const KVMap& data) { Reset(); sink_.reset(new StringSink()); @@ -277,7 +276,6 @@ class BlockBasedTableConstructor: public Constructor { // Open the table uniq_id_ = cur_uniq_id_++; source_.reset(new StringSource(sink_->contents(), uniq_id_)); - unique_ptr table_factory; return options.table_factory->GetTableReader(options, soptions, std::move(source_), sink_->contents().size(), @@ -979,6 +977,11 @@ TEST(TableTest, BlockCacheTest) { options.create_if_missing = true; options.statistics = CreateDBStatistics(); options.block_cache = NewLRUCache(1024); + + // Enable the cache for index/filter blocks + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); std::vector keys; KVMap kvmap; @@ -1292,7 +1295,6 @@ TEST(MemTableTest, Simple) { delete memtable->Unref(); } - } // namespace rocksdb int main(int argc, char** argv) { diff --git a/util/cache.cc b/util/cache.cc index 8fa03626b..4707eac94 100644 --- a/util/cache.cc +++ b/util/cache.cc @@ -409,6 +409,9 @@ class ShardedLRUCache : public Cache { virtual size_t GetCapacity() { return capacity_; } + virtual void DisownData() { + shard_ = nullptr; + } }; } // end anonymous namespace diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 341d154e9..a792f0b1c 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -1072,25 +1072,7 @@ void ReduceDBLevelsCommand::DoCommand() { CloseDB(); EnvOptions soptions; - TableCache tc(db_path_, &opt, soptions, 10); - const InternalKeyComparator cmp(opt.comparator); - VersionSet versions(db_path_, &opt, soptions, &tc, &cmp); - std::vector dummy; - dummy.push_back(ColumnFamilyDescriptor()); - // We rely the VersionSet::Recover to tell us the internal data structures - // in the db. And the Recover() should never do any change (like LogAndApply) - // to the manifest file. - st = versions.Recover(dummy); - if (!st.ok()) { - exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString()); - return; - } - - port::Mutex mu; - mu.Lock(); - st = versions.ReduceNumberOfLevels(new_levels_, &mu); - mu.Unlock(); - + st = VersionSet::ReduceNumberOfLevels(db_path_, &opt, soptions, new_levels_); if (!st.ok()) { exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString()); return;