From df9069d23f50f5d2b35460922a2c703602d8c79a Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 2 Jun 2014 16:38:00 -0700 Subject: [PATCH] In DB::NewIterator(), try to allocate the whole iterator tree in an arena Summary: In this patch, try to allocate the whole iterator tree starting from DBIter from an arena 1. ArenaWrappedDBIter is created when serves as the entry point of an iterator tree, with an arena in it. 2. Add an option to create iterator from arena for following iterators: DBIter, MergingIterator, MemtableIterator, all mem table's iterators, all table reader's iterators and two level iterator. 3. MergeIteratorBuilder is created to incrementally build the tree of internal iterators. It is passed to mem table list and version set and add iterators to it. Limitations: (1) Only DB::NewIterator() without tailing uses the arena. Other cases, including readonly DB and compactions are still from malloc (2) Two level iterator itself is allocated in arena, but not iterators inside it. Test Plan: make all check Reviewers: ljin, haobo Reviewed By: haobo Subscribers: leveldb, dhruba, yhchiang, igor Differential Revision: https://reviews.facebook.net/D18513 --- db/db_impl.cc | 105 +++++++++++++++++++++++------- db/db_impl.h | 4 +- db/db_iter.cc | 71 +++++++++++++++----- db/db_iter.h | 46 +++++++++++++ db/memtable.cc | 31 +++++++-- db/memtable.h | 7 +- db/memtable_list.cc | 9 +++ db/memtable_list.h | 4 ++ db/simple_table_db_test.cc | 12 +++- db/table_cache.cc | 7 +- db/table_cache.h | 3 +- db/version_set.cc | 26 ++++++++ db/version_set.h | 4 ++ include/rocksdb/memtablerep.h | 18 ++++- table/block_based_table_reader.cc | 9 +-- table/block_based_table_reader.h | 2 +- table/iterator.cc | 20 ++++++ table/iterator_wrapper.h | 24 ++++--- table/merger.cc | 68 ++++++++++++++++--- table/merger.h | 33 +++++++++- table/plain_table_reader.cc | 11 +++- table/plain_table_reader.h | 2 +- table/table_reader.h | 7 +- table/two_level_iterator.cc | 15 ++++- table/two_level_iterator.h | 7 +- util/arena.h | 2 + util/hash_cuckoo_rep.cc | 18 +++-- util/hash_linklist_rep.cc | 24 +++++-- util/hash_skiplist_rep.cc | 23 +++++-- util/skiplistrep.cc | 10 ++- util/vectorrep.cc | 20 ++++-- 31 files changed, 532 insertions(+), 110 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 326abc8db..38285e030 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3179,18 +3179,34 @@ static void CleanupIteratorState(void* arg1, void* arg2) { Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, ColumnFamilyData* cfd, - SuperVersion* super_version) { - std::vector iterator_list; - // Collect iterator for mutable mem - iterator_list.push_back(super_version->mem->NewIterator(options)); - // Collect all needed child iterators for immutable memtables - super_version->imm->AddIterators(options, &iterator_list); - // Collect iterators for files in L0 - Ln - super_version->current->AddIterators(options, storage_options_, - &iterator_list); - Iterator* internal_iter = NewMergingIterator( - &cfd->internal_comparator(), &iterator_list[0], iterator_list.size()); - + SuperVersion* super_version, + Arena* arena) { + Iterator* internal_iter; + if (arena != nullptr) { + // Need to create internal iterator from the arena. + MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena); + // Collect iterator for mutable mem + merge_iter_builder.AddIterator( + super_version->mem->NewIterator(options, false, arena)); + // Collect all needed child iterators for immutable memtables + super_version->imm->AddIterators(options, &merge_iter_builder); + // Collect iterators for files in L0 - Ln + super_version->current->AddIterators(options, storage_options_, + &merge_iter_builder); + internal_iter = merge_iter_builder.Finish(); + } else { + // Need to create internal iterator using malloc. + std::vector iterator_list; + // Collect iterator for mutable mem + iterator_list.push_back(super_version->mem->NewIterator(options)); + // Collect all needed child iterators for immutable memtables + super_version->imm->AddIterators(options, &iterator_list); + // Collect iterators for files in L0 - Ln + super_version->current->AddIterators(options, storage_options_, + &iterator_list); + internal_iter = NewMergingIterator(&cfd->internal_comparator(), + &iterator_list[0], iterator_list.size()); + } IterState* cleanup = new IterState(this, &mutex_, super_version); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); @@ -3541,34 +3557,77 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); - Iterator* iter; if (options.tailing) { #ifdef ROCKSDB_LITE // not supported in lite version return nullptr; #else // TODO(ljin): remove tailing iterator - iter = new ForwardIterator(env_, this, options, cfd); - iter = NewDBIterator(env_, *cfd->options(), - cfd->user_comparator(), iter, kMaxSequenceNumber); - //iter = new TailingIterator(env_, this, options, cfd); + auto iter = new ForwardIterator(env_, this, options, cfd); + return NewDBIterator(env_, *cfd->options(), cfd->user_comparator(), iter, + kMaxSequenceNumber); +// return new TailingIterator(env_, this, options, cfd); #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); SuperVersion* sv = nullptr; sv = cfd->GetReferencedSuperVersion(&mutex_); - iter = NewInternalIterator(options, cfd, sv); - auto snapshot = options.snapshot != nullptr ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot; - iter = NewDBIterator(env_, *cfd->options(), - cfd->user_comparator(), iter, snapshot); - } - return iter; + // Try to generate a DB iterator tree in continuous memory area to be + // cache friendly. Here is an example of result: + // +-------------------------------+ + // | | + // | ArenaWrappedDBIter | + // | + | + // | +---> Inner Iterator ------------+ + // | | | | + // | | +-- -- -- -- -- -- -- --+ | + // | +--- | Arena | | + // | | | | + // | Allocated Memory: | | + // | | +-------------------+ | + // | | | DBIter | <---+ + // | | + | + // | | | +-> iter_ ------------+ + // | | | | | + // | | +-------------------+ | + // | | | MergingIterator | <---+ + // | | + | + // | | | +->child iter1 ------------+ + // | | | | | | + // | | +->child iter2 ----------+ | + // | | | | | | | + // | | | +->child iter3 --------+ | | + // | | | | | | + // | | +-------------------+ | | | + // | | | Iterator1 | <--------+ + // | | +-------------------+ | | + // | | | Iterator2 | <------+ + // | | +-------------------+ | + // | | | Iterator3 | <----+ + // | | +-------------------+ + // | | | + // +-------+-----------------------+ + // + // ArenaWrappedDBIter inlines an arena area where all the iterartor in the + // the iterator tree is allocated in the order of being accessed when + // querying. + // Laying out the iterators in the order of being accessed makes it more + // likely that any iterator pointer is close to the iterator it points to so + // that they are likely to be in the same cache line and/or page. + ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( + env_, *cfd->options(), cfd->user_comparator(), snapshot); + Iterator* internal_iter = + NewInternalIterator(options, cfd, sv, db_iter->GetArena()); + db_iter->SetIterUnderDBIter(internal_iter); + + return db_iter; + } } Status DBImpl::NewIterators( diff --git a/db/db_impl.h b/db/db_impl.h index 3c1bc5fc5..6049db8d6 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -39,6 +39,7 @@ class Version; class VersionEdit; class VersionSet; class CompactionFilterV2; +class Arena; class DBImpl : public DB { public: @@ -278,7 +279,8 @@ class DBImpl : public DB { const DBOptions options_; Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd, - SuperVersion* super_version); + SuperVersion* super_version, + Arena* arena = nullptr); private: friend class DB; diff --git a/db/db_iter.cc b/db/db_iter.cc index 3de620dff..4c206f02f 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -18,6 +18,7 @@ #include "rocksdb/iterator.h" #include "rocksdb/merge_operator.h" #include "port/port.h" +#include "util/arena.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/perf_context_imp.h" @@ -37,8 +38,6 @@ static void DumpInternalIter(Iterator* iter) { } #endif -namespace { - // Memtables and sstables that make the DB representation contain // (userkey,seq,type) => uservalue entries. DBIter // combines multiple entries for the same userkey found in the DB @@ -57,9 +56,10 @@ class DBIter: public Iterator { kReverse }; - DBIter(Env* env, const Options& options, - const Comparator* cmp, Iterator* iter, SequenceNumber s) - : env_(env), + DBIter(Env* env, const Options& options, const Comparator* cmp, + Iterator* iter, SequenceNumber s, bool arena_mode) + : arena_mode_(arena_mode), + env_(env), logger_(options.info_log.get()), user_comparator_(cmp), user_merge_operator_(options.merge_operator.get()), @@ -74,7 +74,15 @@ class DBIter: public Iterator { } virtual ~DBIter() { RecordTick(statistics_, NO_ITERATORS, -1); - delete iter_; + if (!arena_mode_) { + delete iter_; + } else { + iter_->~Iterator(); + } + } + virtual void SetIter(Iterator* iter) { + assert(iter_ == nullptr); + iter_ = iter; } virtual bool Valid() const { return valid_; } virtual Slice key() const { @@ -116,11 +124,12 @@ class DBIter: public Iterator { } } + bool arena_mode_; Env* const env_; Logger* logger_; const Comparator* const user_comparator_; const MergeOperator* const user_merge_operator_; - Iterator* const iter_; + Iterator* iter_; SequenceNumber const sequence_; Status status_; @@ -461,16 +470,48 @@ void DBIter::SeekToLast() { FindPrevUserEntry(); } -} // anonymous namespace +Iterator* NewDBIterator(Env* env, const Options& options, + const Comparator* user_key_comparator, + Iterator* internal_iter, + const SequenceNumber& sequence) { + return new DBIter(env, options, user_key_comparator, internal_iter, sequence, + false); +} + +ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~Iterator(); } + +void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; } + +void ArenaWrappedDBIter::SetIterUnderDBIter(Iterator* iter) { + static_cast(db_iter_)->SetIter(iter); +} + +inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); } +inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); } +inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); } +inline void ArenaWrappedDBIter::Seek(const Slice& target) { + db_iter_->Seek(target); +} +inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); } +inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); } +inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); } +inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); } +inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); } +void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1, + void* arg2) { + db_iter_->RegisterCleanup(function, arg1, arg2); +} -Iterator* NewDBIterator( - Env* env, - const Options& options, - const Comparator *user_key_comparator, - Iterator* internal_iter, +ArenaWrappedDBIter* NewArenaWrappedDbIterator( + Env* env, const Options& options, const Comparator* user_key_comparator, const SequenceNumber& sequence) { - return new DBIter(env, options, user_key_comparator, - internal_iter, sequence); + ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); + Arena* arena = iter->GetArena(); + auto mem = arena->AllocateAligned(sizeof(DBIter)); + DBIter* db_iter = new (mem) + DBIter(env, options, user_key_comparator, nullptr, sequence, true); + iter->SetDBIter(db_iter); + return iter; } } // namespace rocksdb diff --git a/db/db_iter.h b/db/db_iter.h index d8a3bad51..cb9840324 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -11,9 +11,14 @@ #include #include "rocksdb/db.h" #include "db/dbformat.h" +#include "util/arena.h" +#include "util/autovector.h" namespace rocksdb { +class Arena; +class DBIter; + // Return a new iterator that converts internal keys (yielded by // "*internal_iter") that were live at the specified "sequence" number // into appropriate user keys. @@ -24,4 +29,45 @@ extern Iterator* NewDBIterator( Iterator* internal_iter, const SequenceNumber& sequence); +// A wrapper iterator which wraps DB Iterator and the arena, with which the DB +// iterator is supposed be allocated. This class is used as an entry point of +// a iterator hierarchy whose memory can be allocated inline. In that way, +// accessing the iterator tree can be more cache friendly. It is also faster +// to allocate. +class ArenaWrappedDBIter : public Iterator { + public: + virtual ~ArenaWrappedDBIter(); + + // Get the arena to be used to allocate memory for DBIter to be wrapped, + // as well as child iterators in it. + virtual Arena* GetArena() { return &arena_; } + + // Set the DB Iterator to be wrapped + + virtual void SetDBIter(DBIter* iter); + + // Set the internal iterator wrapped inside the DB Iterator. Usually it is + // a merging iterator. + virtual void SetIterUnderDBIter(Iterator* iter); + virtual bool Valid() const override; + virtual void SeekToFirst() override; + virtual void SeekToLast() override; + virtual void Seek(const Slice& target) override; + virtual void Next() override; + virtual void Prev() override; + virtual Slice key() const override; + virtual Slice value() const override; + virtual Status status() const override; + void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2); + + private: + DBIter* db_iter_; + Arena arena_; +}; + +// Generate the arena wrapped iterator class. +extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( + Env* env, const Options& options, const Comparator* user_key_comparator, + const SequenceNumber& sequence); + } // namespace rocksdb diff --git a/db/memtable.cc b/db/memtable.cc index 45f58b979..c6b915b99 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -20,6 +20,7 @@ #include "rocksdb/iterator.h" #include "rocksdb/merge_operator.h" #include "rocksdb/slice_transform.h" +#include "table/merger.h" #include "util/arena.h" #include "util/coding.h" #include "util/murmurhash.h" @@ -173,15 +174,24 @@ const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator: public Iterator { public: MemTableIterator(const MemTable& mem, const ReadOptions& options, - bool enforce_total_order) + bool enforce_total_order, Arena* arena) : bloom_(nullptr), prefix_extractor_(mem.prefix_extractor_), - valid_(false) { + valid_(false), + arena_mode_(arena != nullptr) { if (prefix_extractor_ != nullptr && !enforce_total_order) { bloom_ = mem.prefix_bloom_.get(); - iter_.reset(mem.table_->GetDynamicPrefixIterator()); + iter_ = mem.table_->GetDynamicPrefixIterator(arena); } else { - iter_.reset(mem.table_->GetIterator()); + iter_ = mem.table_->GetIterator(arena); + } + } + + ~MemTableIterator() { + if (arena_mode_) { + iter_->~Iterator(); + } else { + delete iter_; } } @@ -228,8 +238,9 @@ class MemTableIterator: public Iterator { private: DynamicBloom* bloom_; const SliceTransform* const prefix_extractor_; - std::unique_ptr iter_; + MemTableRep::Iterator* iter_; bool valid_; + bool arena_mode_; // No copying allowed MemTableIterator(const MemTableIterator&); @@ -237,8 +248,14 @@ class MemTableIterator: public Iterator { }; Iterator* MemTable::NewIterator(const ReadOptions& options, - bool enforce_total_order) { - return new MemTableIterator(*this, options, enforce_total_order); + bool enforce_total_order, Arena* arena) { + if (arena == nullptr) { + return new MemTableIterator(*this, options, enforce_total_order, nullptr); + } else { + auto mem = arena->AllocateAligned(sizeof(MemTableIterator)); + return new (mem) + MemTableIterator(*this, options, enforce_total_order, arena); + } } port::RWMutex* MemTable::GetLock(const Slice& key) { diff --git a/db/memtable.h b/db/memtable.h index 7e9af3504..8bad2773a 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -21,6 +21,7 @@ namespace rocksdb { +class Arena; class Mutex; class MemTableIterator; class MergeContext; @@ -77,8 +78,12 @@ class MemTable { // // By default, it returns an iterator for prefix seek if prefix_extractor // is configured in Options. + // arena: If not null, the arena needs to be used to allocate the Iterator. + // Calling ~Iterator of the iterator will destroy all the states but + // those allocated in arena. Iterator* NewIterator(const ReadOptions& options, - bool enforce_total_order = false); + bool enforce_total_order = false, + Arena* arena = nullptr); // Add an entry into memtable that maps key to value at the // specified sequence number and with the specified type. diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 235421962..de1a18eee 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -11,6 +11,7 @@ #include "db/version_set.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" +#include "table/merger.h" #include "util/coding.h" #include "util/log_buffer.h" @@ -78,6 +79,14 @@ void MemTableListVersion::AddIterators(const ReadOptions& options, } } +void MemTableListVersion::AddIterators( + const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) { + for (auto& m : memlist_) { + merge_iter_builder->AddIterator( + m->NewIterator(options, merge_iter_builder->GetArena())); + } +} + uint64_t MemTableListVersion::GetTotalNumEntries() const { uint64_t total_num = 0; for (auto& m : memlist_) { diff --git a/db/memtable_list.h b/db/memtable_list.h index d85380b55..e56710fc9 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -28,6 +28,7 @@ namespace rocksdb { class ColumnFamilyData; class InternalKeyComparator; class Mutex; +class MergeIteratorBuilder; // keeps a list of immutable memtables in a vector. the list is immutable // if refcount is bigger than one. It is used as a state for Get() and @@ -49,6 +50,9 @@ class MemTableListVersion { void AddIterators(const ReadOptions& options, std::vector* iterator_list); + void AddIterators(const ReadOptions& options, + MergeIteratorBuilder* merge_iter_builder); + uint64_t GetTotalNumEntries() const; private: diff --git a/db/simple_table_db_test.cc b/db/simple_table_db_test.cc index affa61465..a86ff0a17 100644 --- a/db/simple_table_db_test.cc +++ b/db/simple_table_db_test.cc @@ -83,7 +83,7 @@ public: unique_ptr && file, uint64_t file_size, unique_ptr* table_reader); - Iterator* NewIterator(const ReadOptions&) override; + Iterator* NewIterator(const ReadOptions&, Arena* arena) override; Status Get(const ReadOptions&, const Slice& key, void* arg, bool (*handle_result)(void* arg, const ParsedInternalKey& k, @@ -218,8 +218,14 @@ std::shared_ptr SimpleTableReader::GetTableProperties() return rep_->table_properties; } -Iterator* SimpleTableReader::NewIterator(const ReadOptions& options) { - return new SimpleTableIterator(this); +Iterator* SimpleTableReader::NewIterator(const ReadOptions& options, + Arena* arena) { + if (arena == nullptr) { + return new SimpleTableIterator(this); + } else { + auto mem = arena->AllocateAligned(sizeof(SimpleTableIterator)); + return new (mem) SimpleTableIterator(this); + } } Status SimpleTableReader::GetOffset(const Slice& target, uint64_t* offset) { diff --git a/db/table_cache.cc b/db/table_cache.cc index 2321d035a..f4757cbfe 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -13,6 +13,7 @@ #include "db/version_edit.h" #include "rocksdb/statistics.h" +#include "table/iterator_wrapper.h" #include "table/table_reader.h" #include "util/coding.h" #include "util/stop_watch.h" @@ -102,7 +103,7 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, const InternalKeyComparator& icomparator, const FileMetaData& file_meta, TableReader** table_reader_ptr, - bool for_compaction) { + bool for_compaction, Arena* arena) { if (table_reader_ptr != nullptr) { *table_reader_ptr = nullptr; } @@ -113,12 +114,12 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, s = FindTable(toptions, icomparator, file_meta.number, file_meta.file_size, &handle, nullptr, options.read_tier == kBlockCacheTier); if (!s.ok()) { - return NewErrorIterator(s); + return NewErrorIterator(s, arena); } table_reader = GetTableReaderFromHandle(handle); } - Iterator* result = table_reader->NewIterator(options); + Iterator* result = table_reader->NewIterator(options, arena); if (handle != nullptr) { result->RegisterCleanup(&UnrefEntry, cache_, handle); } diff --git a/db/table_cache.h b/db/table_cache.h index e8cd7ea2e..1aa61db01 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -23,6 +23,7 @@ namespace rocksdb { class Env; +class Arena; struct FileMetaData; // TODO(sdong): try to come up with a better API to pass the file information @@ -44,7 +45,7 @@ class TableCache { const InternalKeyComparator& internal_comparator, const FileMetaData& file_meta, TableReader** table_reader_ptr = nullptr, - bool for_compaction = false); + bool for_compaction = false, Arena* arena = nullptr); // If a seek to internal key "k" in specified file finds an entry, // call (*handle_result)(arg, found_key, found_value) repeatedly until diff --git a/db/version_set.cc b/db/version_set.cc index 5327cf55f..c6a9e6ab1 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -332,6 +332,32 @@ void Version::AddIterators(const ReadOptions& read_options, } } +void Version::AddIterators(const ReadOptions& read_options, + const EnvOptions& soptions, + MergeIteratorBuilder* merge_iter_builder) { + // Merge all level zero files together since they may overlap + for (const FileMetaData* file : files_[0]) { + merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( + read_options, soptions, cfd_->internal_comparator(), *file, nullptr, + false, merge_iter_builder->GetArena())); + } + + // For levels > 0, we can use a concatenating iterator that sequentially + // walks through the non-overlapping files in the level, opening them + // lazily. + for (int level = 1; level < num_levels_; level++) { + if (!files_[level].empty()) { + merge_iter_builder->AddIterator(NewTwoLevelIterator( + new LevelFileIteratorState( + cfd_->table_cache(), read_options, soptions, + cfd_->internal_comparator(), false /* for_compaction */, + cfd_->options()->prefix_extractor != nullptr), + new LevelFileNumIterator(cfd_->internal_comparator(), &files_[level]), + merge_iter_builder->GetArena())); + } + } +} + // Callback from TableCache::Get() namespace { enum SaverState { diff --git a/db/version_set.h b/db/version_set.h index 852c0323b..7c8d7146e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -51,6 +51,7 @@ class MergeContext; class ColumnFamilyData; class ColumnFamilySet; class TableCache; +class MergeIteratorBuilder; // Return the smallest index i such that files[i]->largest >= key. // Return files.size() if there is no such file. @@ -80,6 +81,9 @@ class Version { void AddIterators(const ReadOptions&, const EnvOptions& soptions, std::vector* iters); + void AddIterators(const ReadOptions&, const EnvOptions& soptions, + MergeIteratorBuilder* merger_iter_builder); + // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. Fills *stats. // Uses *operands to store merge_operator operations to apply later diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index ac376d747..6134fd166 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -142,16 +142,28 @@ class MemTableRep { }; // Return an iterator over the keys in this representation. - virtual Iterator* GetIterator() = 0; + // arena: If not null, the arena needs to be used to allocate the Iterator. + // When destroying the iterator, the caller will not call "delete" + // but Iterator::~Iterator() directly. The destructor needs to destroy + // all the states but those allocated in arena. + virtual Iterator* GetIterator(Arena* arena = nullptr) = 0; // Return an iterator over at least the keys with the specified user key. The // iterator may also allow access to other keys, but doesn't have to. Default: // GetIterator(). - virtual Iterator* GetIterator(const Slice& user_key) { return GetIterator(); } + virtual Iterator* GetIterator(const Slice& user_key) { + return GetIterator(nullptr); + } // Return an iterator that has a special Seek semantics. The result of // a Seek might only include keys with the same prefix as the target key. - virtual Iterator* GetDynamicPrefixIterator() { return GetIterator(); } + // arena: If not null, the arena needs to be used to allocate the Iterator. + // When destroying the iterator, the caller will not call "delete" + // but Iterator::~Iterator() directly. The destructor needs to destroy + // all the states but those allocated in arena. + virtual Iterator* GetDynamicPrefixIterator(Arena* arena = nullptr) { + return GetIterator(arena); + } // Return true if the current MemTableRep supports merge operator. // Default: true diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 4fcdfd29e..71fff659a 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -980,10 +980,11 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) { return may_match; } -Iterator* BlockBasedTable::NewIterator(const ReadOptions& read_options) { - return NewTwoLevelIterator(new BlockEntryIteratorState(this, read_options, - nullptr), - NewIndexIterator(read_options)); +Iterator* BlockBasedTable::NewIterator(const ReadOptions& read_options, + Arena* arena) { + return NewTwoLevelIterator( + new BlockEntryIteratorState(this, read_options, nullptr), + NewIndexIterator(read_options), arena); } Status BlockBasedTable::Get( diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index cfb1d0af0..ba6a10c3e 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -68,7 +68,7 @@ class BlockBasedTable : public TableReader { // Returns a new iterator over the table contents. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). - Iterator* NewIterator(const ReadOptions&) override; + Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override; Status Get(const ReadOptions& readOptions, const Slice& key, void* handle_context, diff --git a/table/iterator.cc b/table/iterator.cc index a3d4f638d..4c360205a 100644 --- a/table/iterator.cc +++ b/table/iterator.cc @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "rocksdb/iterator.h" +#include "table/iterator_wrapper.h" +#include "util/arena.h" namespace rocksdb { @@ -65,8 +67,26 @@ Iterator* NewEmptyIterator() { return new EmptyIterator(Status::OK()); } +Iterator* NewEmptyIterator(Arena* arena) { + if (arena == nullptr) { + return NewEmptyIterator(); + } else { + auto mem = arena->AllocateAligned(sizeof(EmptyIterator)); + return new (mem) EmptyIterator(Status::OK()); + } +} + Iterator* NewErrorIterator(const Status& status) { return new EmptyIterator(status); } +Iterator* NewErrorIterator(const Status& status, Arena* arena) { + if (arena == nullptr) { + return NewErrorIterator(status); + } else { + auto mem = arena->AllocateAligned(sizeof(EmptyIterator)); + return new (mem) EmptyIterator(status); + } +} + } // namespace rocksdb diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index 117c9fdc4..502cacb3e 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -23,14 +23,7 @@ class IteratorWrapper { explicit IteratorWrapper(Iterator* iter): iter_(nullptr) { Set(iter); } - IteratorWrapper(const IteratorWrapper&) { - // Iterator wrapper exclusively owns iter_ so it cannot be copied. - // Didn't delete the function because vector requires - // this function to compile. - assert(false); - } - void operator=(const IteratorWrapper&) = delete; - ~IteratorWrapper() { delete iter_; } + ~IteratorWrapper() {} Iterator* iter() const { return iter_; } // Takes ownership of "iter" and will delete it when destroyed, or @@ -45,6 +38,14 @@ class IteratorWrapper { } } + void DeleteIter(bool is_arena_mode) { + if (!is_arena_mode) { + delete iter_; + } else { + iter_->~Iterator(); + } + } + // Iterator interface methods bool Valid() const { return valid_; } Slice key() const { assert(Valid()); return key_; } @@ -70,4 +71,11 @@ class IteratorWrapper { Slice key_; }; +class Arena; +// Return an empty iterator (yields nothing) allocated from arena. +extern Iterator* NewEmptyIterator(Arena* arena); + +// Return an empty iterator with the specified status, allocated arena. +extern Iterator* NewErrorIterator(const Status& status, Arena* arena); + } // namespace rocksdb diff --git a/table/merger.cc b/table/merger.cc index 6c51f7fa1..9aab33ed3 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -17,13 +17,13 @@ #include "rocksdb/options.h" #include "table/iter_heap.h" #include "table/iterator_wrapper.h" +#include "util/arena.h" #include "util/stop_watch.h" #include "util/perf_context_imp.h" #include "util/autovector.h" namespace rocksdb { namespace { - typedef std::priority_queue< IteratorWrapper*, std::vector, @@ -43,13 +43,16 @@ MaxIterHeap NewMaxIterHeap(const Comparator* comparator) { MinIterHeap NewMinIterHeap(const Comparator* comparator) { return MinIterHeap(MinIteratorComparator(comparator)); } +} // namespace const size_t kNumIterReserve = 4; class MergingIterator : public Iterator { public: - MergingIterator(const Comparator* comparator, Iterator** children, int n) - : comparator_(comparator), + MergingIterator(const Comparator* comparator, Iterator** children, int n, + bool is_arena_mode) + : is_arena_mode_(is_arena_mode), + comparator_(comparator), current_(nullptr), use_heap_(true), direction_(kForward), @@ -66,7 +69,20 @@ class MergingIterator : public Iterator { } } - virtual ~MergingIterator() { } + virtual void AddIterator(Iterator* iter) { + assert(direction_ == kForward); + children_.emplace_back(iter); + auto new_wrapper = children_.back(); + if (new_wrapper.Valid()) { + minHeap_.push(&new_wrapper); + } + } + + virtual ~MergingIterator() { + for (auto& child : children_) { + child.DeleteIter(is_arena_mode_); + } + } virtual bool Valid() const { return (current_ != nullptr); @@ -242,6 +258,7 @@ class MergingIterator : public Iterator { void FindLargest(); void ClearHeaps(); + bool is_arena_mode_; const Comparator* comparator_; autovector children_; IteratorWrapper* current_; @@ -288,16 +305,51 @@ void MergingIterator::ClearHeaps() { maxHeap_ = NewMaxIterHeap(comparator_); minHeap_ = NewMinIterHeap(comparator_); } -} // namespace -Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n) { +Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n, + Arena* arena) { assert(n >= 0); if (n == 0) { - return NewEmptyIterator(); + return NewEmptyIterator(arena); } else if (n == 1) { return list[0]; } else { - return new MergingIterator(cmp, list, n); + if (arena == nullptr) { + return new MergingIterator(cmp, list, n, false); + } else { + auto mem = arena->AllocateAligned(sizeof(MergingIterator)); + return new (mem) MergingIterator(cmp, list, n, true); + } + } +} + +MergeIteratorBuilder::MergeIteratorBuilder(const Comparator* comparator, + Arena* a) + : first_iter(nullptr), use_merging_iter(false), arena(a) { + + auto mem = arena->AllocateAligned(sizeof(MergingIterator)); + merge_iter = new (mem) MergingIterator(comparator, nullptr, 0, true); +} + +void MergeIteratorBuilder::AddIterator(Iterator* iter) { + if (!use_merging_iter && first_iter != nullptr) { + merge_iter->AddIterator(first_iter); + use_merging_iter = true; + } + if (use_merging_iter) { + merge_iter->AddIterator(iter); + } else { + first_iter = iter; + } +} + +Iterator* MergeIteratorBuilder::Finish() { + if (!use_merging_iter) { + return first_iter; + } else { + auto ret = merge_iter; + merge_iter = nullptr; + return ret; } } diff --git a/table/merger.h b/table/merger.h index 3a1a4feb8..7dcf2afe7 100644 --- a/table/merger.h +++ b/table/merger.h @@ -9,11 +9,14 @@ #pragma once +#include "rocksdb/types.h" + namespace rocksdb { class Comparator; class Iterator; class Env; +class Arena; // Return an iterator that provided the union of the data in // children[0,n-1]. Takes ownership of the child iterators and @@ -24,6 +27,34 @@ class Env; // // REQUIRES: n >= 0 extern Iterator* NewMergingIterator(const Comparator* comparator, - Iterator** children, int n); + Iterator** children, int n, + Arena* arena = nullptr); + +class MergingIterator; + +// A builder class to build a merging iterator by adding iterators one by one. +class MergeIteratorBuilder { + public: + // comparator: the comparator used in merging comparator + // arena: where the merging iterator needs to be allocated from. + explicit MergeIteratorBuilder(const Comparator* comparator, Arena* arena); + ~MergeIteratorBuilder() {} + + // Add iter to the merging iterator. + void AddIterator(Iterator* iter); + + // Get arena used to build the merging iterator. It is called one a child + // iterator needs to be allocated. + Arena* GetArena() { return arena; } + + // Return the result merging iterator. + Iterator* Finish(); + + private: + MergingIterator* merge_iter; + Iterator* first_iter; + bool use_merging_iter; + Arena* arena; +}; } // namespace rocksdb diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 43daaa9a9..22968ef6b 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -156,8 +156,15 @@ Status PlainTableReader::Open(const Options& options, void PlainTableReader::SetupForCompaction() { } -Iterator* PlainTableReader::NewIterator(const ReadOptions& options) { - return new PlainTableIterator(this, options_.prefix_extractor != nullptr); +Iterator* PlainTableReader::NewIterator(const ReadOptions& options, + Arena* arena) { + if (arena == nullptr) { + return new PlainTableIterator(this, options_.prefix_extractor != nullptr); + } else { + auto mem = arena->AllocateAligned(sizeof(PlainTableIterator)); + return new (mem) + PlainTableIterator(this, options_.prefix_extractor != nullptr); + } } struct PlainTableReader::IndexRecord { diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h index e6373dc82..62239beb3 100644 --- a/table/plain_table_reader.h +++ b/table/plain_table_reader.h @@ -55,7 +55,7 @@ class PlainTableReader: public TableReader { const int bloom_bits_per_key, double hash_table_ratio, size_t index_sparseness, size_t huge_page_tlb_size); - Iterator* NewIterator(const ReadOptions&); + Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override; Status Get(const ReadOptions&, const Slice& key, void* arg, bool (*result_handler)(void* arg, const ParsedInternalKey& k, diff --git a/table/table_reader.h b/table/table_reader.h index 02a2d16dc..9238b880c 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -15,6 +15,7 @@ namespace rocksdb { class Iterator; struct ParsedInternalKey; class Slice; +class Arena; struct ReadOptions; struct TableProperties; @@ -28,7 +29,11 @@ class TableReader { // Returns a new iterator over the table contents. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). - virtual Iterator* NewIterator(const ReadOptions&) = 0; + // arena: If not null, the arena needs to be used to allocate the Iterator. + // When destroying the iterator, the caller will not call "delete" + // but Iterator::~Iterator() directly. The destructor needs to destroy + // all the states but those allocated in arena. + virtual Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) = 0; // Given a key, return an approximate byte offset in the file where // the data for that key begins (or would begin if the key were diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index 990f18184..6af48f58c 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -13,6 +13,7 @@ #include "rocksdb/table.h" #include "table/block.h" #include "table/format.h" +#include "util/arena.h" namespace rocksdb { @@ -23,7 +24,10 @@ class TwoLevelIterator: public Iterator { explicit TwoLevelIterator(TwoLevelIteratorState* state, Iterator* first_level_iter); - virtual ~TwoLevelIterator() {} + virtual ~TwoLevelIterator() { + first_level_iter_.DeleteIter(false); + second_level_iter_.DeleteIter(false); + } virtual void Seek(const Slice& target); virtual void SeekToFirst(); @@ -183,8 +187,13 @@ void TwoLevelIterator::InitDataBlock() { } // namespace Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state, - Iterator* first_level_iter) { - return new TwoLevelIterator(state, first_level_iter); + Iterator* first_level_iter, Arena* arena) { + if (arena == nullptr) { + return new TwoLevelIterator(state, first_level_iter); + } else { + auto mem = arena->AllocateAligned(sizeof(TwoLevelIterator)); + return new (mem) TwoLevelIterator(state, first_level_iter); + } } } // namespace rocksdb diff --git a/table/two_level_iterator.h b/table/two_level_iterator.h index b8083385b..d955dd763 100644 --- a/table/two_level_iterator.h +++ b/table/two_level_iterator.h @@ -16,6 +16,7 @@ namespace rocksdb { struct ReadOptions; class InternalKeyComparator; +class Arena; struct TwoLevelIteratorState { explicit TwoLevelIteratorState(bool check_prefix_may_match) @@ -39,7 +40,11 @@ struct TwoLevelIteratorState { // // Uses a supplied function to convert an index_iter value into // an iterator over the contents of the corresponding block. +// arena: If not null, the arena is used to allocate the Iterator. +// When destroying the iterator, the destructor will destroy +// all the states but those allocated in arena. extern Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state, - Iterator* first_level_iter); + Iterator* first_level_iter, + Arena* arena = nullptr); } // namespace rocksdb diff --git a/util/arena.h b/util/arena.h index 53acd2270..0855c205c 100644 --- a/util/arena.h +++ b/util/arena.h @@ -23,6 +23,8 @@ namespace rocksdb { class Logger; +const size_t kInlineSize = 2048; + class Arena { public: // No copying allowed diff --git a/util/hash_cuckoo_rep.cc b/util/hash_cuckoo_rep.cc index a8864692f..e2d2c38e6 100644 --- a/util/hash_cuckoo_rep.cc +++ b/util/hash_cuckoo_rep.cc @@ -251,7 +251,7 @@ class HashCuckooRep : public MemTableRep { // are sorted according to the user specified KeyComparator. Note that // any insert after this function call may affect the sorted nature of // the returned iterator. - virtual MemTableRep::Iterator* GetIterator() override { + virtual MemTableRep::Iterator* GetIterator(Arena* arena) override { std::vector compact_buckets; for (unsigned int bid = 0; bid < bucket_count_; ++bid) { const char* bucket = cuckoo_array_[bid].load(std::memory_order_relaxed); @@ -266,10 +266,18 @@ class HashCuckooRep : public MemTableRep { compact_buckets.push_back(iter->key()); } } - return new Iterator( - std::shared_ptr>( - new std::vector(std::move(compact_buckets))), - compare_); + if (arena == nullptr) { + return new Iterator( + std::shared_ptr>( + new std::vector(std::move(compact_buckets))), + compare_); + } else { + auto mem = arena->AllocateAligned(sizeof(Iterator)); + return new (mem) Iterator( + std::shared_ptr>( + new std::vector(std::move(compact_buckets))), + compare_); + } } }; diff --git a/util/hash_linklist_rep.cc b/util/hash_linklist_rep.cc index 80edad505..60f245b5f 100644 --- a/util/hash_linklist_rep.cc +++ b/util/hash_linklist_rep.cc @@ -70,11 +70,12 @@ class HashLinkListRep : public MemTableRep { virtual ~HashLinkListRep(); - virtual MemTableRep::Iterator* GetIterator() override; + virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override; virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override; - virtual MemTableRep::Iterator* GetDynamicPrefixIterator() override; + virtual MemTableRep::Iterator* GetDynamicPrefixIterator( + Arena* arena = nullptr) override; private: friend class DynamicIterator; @@ -411,7 +412,7 @@ void HashLinkListRep::Get(const LookupKey& k, void* callback_args, } } -MemTableRep::Iterator* HashLinkListRep::GetIterator() { +MemTableRep::Iterator* HashLinkListRep::GetIterator(Arena* alloc_arena) { // allocate a new arena of similar size to the one currently in use Arena* new_arena = new Arena(arena_->BlockSize()); auto list = new FullList(compare_, new_arena); @@ -424,7 +425,12 @@ MemTableRep::Iterator* HashLinkListRep::GetIterator() { } } } - return new FullListIterator(list, new_arena); + if (alloc_arena == nullptr) { + return new FullListIterator(list, new_arena); + } else { + auto mem = alloc_arena->AllocateAligned(sizeof(FullListIterator)); + return new (mem) FullListIterator(list, new_arena); + } } MemTableRep::Iterator* HashLinkListRep::GetIterator(const Slice& slice) { @@ -435,8 +441,14 @@ MemTableRep::Iterator* HashLinkListRep::GetIterator(const Slice& slice) { return new Iterator(this, bucket); } -MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator() { - return new DynamicIterator(*this); +MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator( + Arena* alloc_arena) { + if (alloc_arena == nullptr) { + return new DynamicIterator(*this); + } else { + auto mem = alloc_arena->AllocateAligned(sizeof(DynamicIterator)); + return new (mem) DynamicIterator(*this); + } } bool HashLinkListRep::BucketContains(Node* head, const Slice& user_key) const { diff --git a/util/hash_skiplist_rep.cc b/util/hash_skiplist_rep.cc index 1f03874d1..baee12ad5 100644 --- a/util/hash_skiplist_rep.cc +++ b/util/hash_skiplist_rep.cc @@ -38,11 +38,12 @@ class HashSkipListRep : public MemTableRep { virtual ~HashSkipListRep(); - virtual MemTableRep::Iterator* GetIterator() override; + virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override; virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override; - virtual MemTableRep::Iterator* GetDynamicPrefixIterator() override; + virtual MemTableRep::Iterator* GetDynamicPrefixIterator( + Arena* arena = nullptr) override; private: friend class DynamicIterator; @@ -288,7 +289,7 @@ void HashSkipListRep::Get(const LookupKey& k, void* callback_args, } } -MemTableRep::Iterator* HashSkipListRep::GetIterator() { +MemTableRep::Iterator* HashSkipListRep::GetIterator(Arena* arena) { // allocate a new arena of similar size to the one currently in use Arena* new_arena = new Arena(arena_->BlockSize()); auto list = new Bucket(compare_, new_arena); @@ -301,7 +302,12 @@ MemTableRep::Iterator* HashSkipListRep::GetIterator() { } } } - return new Iterator(list, true, new_arena); + if (arena == nullptr) { + return new Iterator(list, true, new_arena); + } else { + auto mem = arena->AllocateAligned(sizeof(Iterator)); + return new (mem) Iterator(list, true, new_arena); + } } MemTableRep::Iterator* HashSkipListRep::GetIterator(const Slice& slice) { @@ -312,8 +318,13 @@ MemTableRep::Iterator* HashSkipListRep::GetIterator(const Slice& slice) { return new Iterator(bucket, false); } -MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator() { - return new DynamicIterator(*this); +MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator(Arena* arena) { + if (arena == nullptr) { + return new DynamicIterator(*this); + } else { + auto mem = arena->AllocateAligned(sizeof(DynamicIterator)); + return new (mem) DynamicIterator(*this); + } } } // anon namespace diff --git a/util/skiplistrep.cc b/util/skiplistrep.cc index f36edf28d..895343001 100644 --- a/util/skiplistrep.cc +++ b/util/skiplistrep.cc @@ -6,6 +6,7 @@ #include "rocksdb/memtablerep.h" #include "db/memtable.h" #include "db/skiplist.h" +#include "util/arena.h" namespace rocksdb { namespace { @@ -108,8 +109,13 @@ public: // Unhide default implementations of GetIterator using MemTableRep::GetIterator; - virtual MemTableRep::Iterator* GetIterator() override { - return new SkipListRep::Iterator(&skip_list_); + virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override { + if (arena == nullptr) { + return new SkipListRep::Iterator(&skip_list_); + } else { + auto mem = arena->AllocateAligned(sizeof(SkipListRep::Iterator)); + return new (mem) SkipListRep::Iterator(&skip_list_); + } } }; } diff --git a/util/vectorrep.cc b/util/vectorrep.cc index 00e5c7450..cf8bad5c4 100644 --- a/util/vectorrep.cc +++ b/util/vectorrep.cc @@ -95,7 +95,7 @@ class VectorRep : public MemTableRep { using MemTableRep::GetIterator; // Return an iterator over the keys in this representation. - virtual MemTableRep::Iterator* GetIterator() override; + virtual MemTableRep::Iterator* GetIterator(Arena* arena) override; private: friend class Iterator; @@ -259,16 +259,28 @@ void VectorRep::Get(const LookupKey& k, void* callback_args, } } -MemTableRep::Iterator* VectorRep::GetIterator() { +MemTableRep::Iterator* VectorRep::GetIterator(Arena* arena) { + char* mem = nullptr; + if (arena != nullptr) { + mem = arena->AllocateAligned(sizeof(Iterator)); + } ReadLock l(&rwlock_); // Do not sort here. The sorting would be done the first time // a Seek is performed on the iterator. if (immutable_) { - return new Iterator(this, bucket_, compare_); + if (arena == nullptr) { + return new Iterator(this, bucket_, compare_); + } else { + return new (mem) Iterator(this, bucket_, compare_); + } } else { std::shared_ptr tmp; tmp.reset(new Bucket(*bucket_)); // make a copy - return new Iterator(nullptr, tmp, compare_); + if (arena == nullptr) { + return new Iterator(nullptr, tmp, compare_); + } else { + return new (mem) Iterator(nullptr, tmp, compare_); + } } } } // anon namespace