From 5b3b6549d68b61e65c1614ad5f4da115a06a94f0 Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Mon, 3 Feb 2014 13:13:36 -0800 Subject: [PATCH] use super_version in NewIterator() and MultiGet() function Summary: Use super_version insider NewIterator to avoid Ref() each component separately under mutex The new added bench shows NewIterator QPS increases from 515K to 719K No meaningful improvement for multiget I guess due to its relatively small cost comparing to 90 keys fetch in the test. Test Plan: unit test and db_bench Reviewers: igor, sdong Reviewed By: igor CC: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D15609 --- db/db_bench.cc | 13 ++++ db/db_impl.cc | 161 ++++++++++++++++------------------------- db/db_impl.h | 4 +- db/db_impl_readonly.cc | 11 ++- db/db_test.cc | 24 ++++++ 5 files changed, 110 insertions(+), 103 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 9e6ef70ca..8355a3f0c 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -99,6 +99,7 @@ DEFINE_string(benchmarks, "Must be used with merge_operator\n" "\treadrandommergerandom -- perform N random read-or-merge " "operations. Must be used with merge_operator\n" + "\tnewiterator -- repeated iterator creation\n" "\tseekrandom -- N random seeks\n" "\tcrc32c -- repeated crc32c of 4K of data\n" "\tacquireload -- load N*1000 times\n" @@ -1089,6 +1090,8 @@ class Benchmark { method = &Benchmark::ReadRandom; } else if (name == Slice("readmissing")) { method = &Benchmark::ReadMissing; + } else if (name == Slice("newiterator")) { + method = &Benchmark::IteratorCreation; } else if (name == Slice("seekrandom")) { method = &Benchmark::SeekRandom; } else if (name == Slice("readhot")) { @@ -1877,6 +1880,16 @@ class Benchmark { thread->stats.AddMessage(msg); } + void IteratorCreation(ThreadState* thread) { + Duration duration(FLAGS_duration, reads_); + ReadOptions options(FLAGS_verify_checksum, true); + while (!duration.Done(1)) { + Iterator* iter = db_->NewIterator(options); + delete iter; + thread->stats.FinishedSingleOp(db_); + } + } + void SeekRandom(ThreadState* thread) { Duration duration(FLAGS_duration, reads_); ReadOptions options(FLAGS_verify_checksum, true); diff --git a/db/db_impl.cc b/db/db_impl.cc index 1c4f73b8e..4dd457e48 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2668,34 +2668,29 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, namespace { struct IterState { + IterState(DBImpl* db, port::Mutex* mu, DBImpl::SuperVersion* super_version) + : db(db), mu(mu), super_version(super_version) {} + + DBImpl* db; port::Mutex* mu; - Version* version = nullptr; - MemTable* mem = nullptr; - MemTableListVersion* imm = nullptr; - DBImpl *db; + DBImpl::SuperVersion* super_version; }; static void CleanupIteratorState(void* arg1, void* arg2) { IterState* state = reinterpret_cast(arg1); DBImpl::DeletionState deletion_state(state->db->GetOptions(). max_write_buffer_number); - state->mu->Lock(); - if (state->mem) { // not set for immutable iterator - MemTable* m = state->mem->Unref(); - if (m != nullptr) { - deletion_state.memtables_to_free.push_back(m); - } - } - if (state->version) { // not set for memtable-only iterator - state->version->Unref(); - } - if (state->imm) { // not set for memtable-only iterator - state->imm->Unref(&deletion_state.memtables_to_free); + + bool need_cleanup = state->super_version->Unref(); + if (need_cleanup) { + state->mu->Lock(); + state->super_version->Cleanup(); + state->db->FindObsoleteFiles(deletion_state, false, true); + state->mu->Unlock(); + + delete state->super_version; + state->db->PurgeObsoleteFiles(deletion_state); } - // fast path FindObsoleteFiles - state->db->FindObsoleteFiles(deletion_state, false, true); - state->mu->Unlock(); - state->db->PurgeObsoleteFiles(deletion_state); delete state; } @@ -2703,36 +2698,23 @@ static void CleanupIteratorState(void* arg1, void* arg2) { Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, SequenceNumber* latest_snapshot) { - IterState* cleanup = new IterState; - MemTable* mutable_mem; - MemTableListVersion* immutable_mems; - Version* version; - - // Collect together all needed child iterators for mem mutex_.Lock(); *latest_snapshot = versions_->LastSequence(); - mem_->Ref(); - mutable_mem = mem_; - // Collect together all needed child iterators for imm_ - immutable_mems = imm_.current(); - immutable_mems->Ref(); - versions_->current()->Ref(); - version = versions_->current(); + SuperVersion* super_version = super_version_->Ref(); mutex_.Unlock(); std::vector iterator_list; - iterator_list.push_back(mutable_mem->NewIterator(options)); - cleanup->mem = mutable_mem; - cleanup->imm = immutable_mems; + // Collect iterator for mutable mem + iterator_list.push_back(super_version->mem->NewIterator(options)); // Collect all needed child iterators for immutable memtables - immutable_mems->AddIterators(options, &iterator_list); + super_version->imm->AddIterators(options, &iterator_list); // Collect iterators for files in L0 - Ln - version->AddIterators(options, storage_options_, &iterator_list); + super_version->current->AddIterators(options, storage_options_, + &iterator_list); Iterator* internal_iter = NewMergingIterator( &internal_comparator_, &iterator_list[0], iterator_list.size()); - cleanup->version = version; - cleanup->mu = &mutex_; - cleanup->db = this; + + IterState* cleanup = new IterState(this, &mutex_, super_version); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); return internal_iter; @@ -2747,53 +2729,36 @@ std::pair DBImpl::GetTailingIteratorPair( const ReadOptions& options, uint64_t* superversion_number) { - MemTable* mutable_mem; - MemTableListVersion* immutable_mems; - Version* version; - - // get all child iterators and bump their refcounts under lock mutex_.Lock(); - mutable_mem = mem_; - mutable_mem->Ref(); - immutable_mems = imm_.current(); - immutable_mems->Ref(); - version = versions_->current(); - version->Ref(); + SuperVersion* super_version = super_version_->Ref(); if (superversion_number != nullptr) { *superversion_number = CurrentVersionNumber(); } mutex_.Unlock(); - Iterator* mutable_iter = mutable_mem->NewIterator(options); - IterState* mutable_cleanup = new IterState(); - mutable_cleanup->mem = mutable_mem; - mutable_cleanup->db = this; - mutable_cleanup->mu = &mutex_; - mutable_iter->RegisterCleanup(CleanupIteratorState, mutable_cleanup, nullptr); - + Iterator* mutable_iter = super_version->mem->NewIterator(options); // create a DBIter that only uses memtable content; see NewIterator() mutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(), mutable_iter, kMaxSequenceNumber); - Iterator* immutable_iter; - IterState* immutable_cleanup = new IterState(); std::vector list; - immutable_mems->AddIterators(options, &list); - immutable_cleanup->imm = immutable_mems; - version->AddIterators(options, storage_options_, &list); - immutable_cleanup->version = version; - immutable_cleanup->db = this; - immutable_cleanup->mu = &mutex_; - - immutable_iter = + super_version->imm->AddIterators(options, &list); + super_version->current->AddIterators(options, storage_options_, &list); + Iterator* immutable_iter = NewMergingIterator(&internal_comparator_, &list[0], list.size()); - immutable_iter->RegisterCleanup(CleanupIteratorState, immutable_cleanup, - nullptr); // create a DBIter that only uses memtable content; see NewIterator() immutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(), immutable_iter, kMaxSequenceNumber); + // register cleanups + mutable_iter->RegisterCleanup(CleanupIteratorState, + new IterState(this, &mutex_, super_version), nullptr); + + // bump the ref one more time since it will be Unref'ed twice + immutable_iter->RegisterCleanup(CleanupIteratorState, + new IterState(this, &mutex_, super_version->Ref()), nullptr); + return std::make_pair(mutable_iter, immutable_iter); } @@ -2924,7 +2889,6 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET, false); SequenceNumber snapshot; - std::vector to_delete; mutex_.Lock(); if (options.snapshot != nullptr) { @@ -2933,16 +2897,9 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, snapshot = versions_->LastSequence(); } - MemTable* mem = mem_; - MemTableListVersion* imm = imm_.current(); - Version* current = versions_->current(); - mem->Ref(); - imm->Ref(); - current->Ref(); - - // Unlock while reading from files and memtables - + SuperVersion* get_version = super_version_->Ref(); mutex_.Unlock(); + bool have_stat_update = false; Version::GetStats stats; @@ -2967,12 +2924,14 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, std::string* value = &(*values)[i]; LookupKey lkey(keys[i], snapshot); - if (mem->Get(lkey, value, &s, merge_context, options_)) { + if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) { // Done - } else if (imm->Get(lkey, value, &s, merge_context, options_)) { + } else if (get_version->imm->Get(lkey, value, &s, merge_context, + options_)) { // Done } else { - current->Get(options, lkey, value, &s, &merge_context, &stats, options_); + get_version->current->Get(options, lkey, value, &s, &merge_context, + &stats, options_); have_stat_update = true; } @@ -2981,20 +2940,28 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, } } - // Post processing (decrement reference counts and record statistics) - mutex_.Lock(); - if (!options_.disable_seek_compaction && - have_stat_update && current->UpdateStats(stats)) { - MaybeScheduleFlushOrCompaction(); + bool delete_get_version = false; + if (!options_.disable_seek_compaction && have_stat_update) { + mutex_.Lock(); + if (get_version->current->UpdateStats(stats)) { + MaybeScheduleFlushOrCompaction(); + } + if (get_version->Unref()) { + get_version->Cleanup(); + delete_get_version = true; + } + mutex_.Unlock(); + } else { + if (get_version->Unref()) { + mutex_.Lock(); + get_version->Cleanup(); + mutex_.Unlock(); + delete_get_version = true; + } + } + if (delete_get_version) { + delete get_version; } - MemTable* m = mem->Unref(); - imm->Unref(&to_delete); - current->Unref(); - mutex_.Unlock(); - - // free up all obsolete memtables outside the mutex - delete m; - for (MemTable* v: to_delete) delete v; RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS); RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, numKeys); diff --git a/db/db_impl.h b/db/db_impl.h index 545b367aa..263439d99 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -249,8 +249,8 @@ class DBImpl : public DB { return internal_comparator_.user_comparator(); } - MemTable* GetMemTable() { - return mem_; + SuperVersion* GetSuperVersion() { + return super_version_; } Iterator* NewInternalIterator(const ReadOptions&, diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index f1ffe3ca3..faa2ff3c7 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -56,15 +56,15 @@ Status DBImplReadOnly::Get(const ReadOptions& options, const Slice& key, std::string* value) { Status s; - MemTable* mem = GetMemTable(); - Version* current = versions_->current(); SequenceNumber snapshot = versions_->LastSequence(); + SuperVersion* super_version = GetSuperVersion(); MergeContext merge_context; LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s, merge_context, options_)) { + if (super_version->mem->Get(lkey, value, &s, merge_context, options_)) { } else { Version::GetStats stats; - current->Get(options, lkey, value, &s, &merge_context, &stats, options_); + super_version->current->Get(options, lkey, value, &s, &merge_context, + &stats, options_); } return s; } @@ -87,6 +87,9 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname, DBImplReadOnly* impl = new DBImplReadOnly(options, dbname); impl->mutex_.Lock(); Status s = impl->Recover(true /* read only */, error_if_log_file_exist); + if (s.ok()) { + delete impl->InstallSuperVersion(new DBImpl::SuperVersion()); + } impl->mutex_.Unlock(); if (s.ok()) { *dbptr = impl; diff --git a/db/db_test.cc b/db/db_test.cc index baa6e7489..75b5ae2d9 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -421,6 +421,10 @@ class DBTest { return DB::Open(*options, dbname_, db); } + Status ReadOnlyReopen(Options* options) { + return DB::OpenForReadOnly(*options, dbname_, &db_); + } + Status TryReopen(Options* options = nullptr) { delete db_; db_ = nullptr; @@ -727,6 +731,26 @@ TEST(DBTest, ReadWrite) { } while (ChangeOptions()); } +TEST(DBTest, ReadOnlyDB) { + ASSERT_OK(Put("foo", "v1")); + ASSERT_OK(Put("bar", "v2")); + ASSERT_OK(Put("foo", "v3")); + Close(); + + Options options; + ASSERT_OK(ReadOnlyReopen(&options)); + ASSERT_EQ("v3", Get("foo")); + ASSERT_EQ("v2", Get("bar")); + Iterator* iter = db_->NewIterator(ReadOptions()); + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + ++count; + } + ASSERT_EQ(count, 2); + delete iter; +} + // Make sure that when options.block_cache is set, after a new table is // created its index/filter blocks are added to block cache. TEST(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) {