In DB::NewIterator(), try to allocate the whole iterator tree in an arena

Summary:
In this patch, try to allocate the whole iterator tree starting from DBIter from an arena
1. ArenaWrappedDBIter is created when serves as the entry point of an iterator tree, with an arena in it.
2. Add an option to create iterator from arena for following iterators: DBIter, MergingIterator, MemtableIterator, all mem table's iterators, all table reader's iterators and two level iterator.
3. MergeIteratorBuilder is created to incrementally build the tree of internal iterators. It is passed to mem table list and version set and add iterators to it.

Limitations:
(1) Only DB::NewIterator() without tailing uses the arena. Other cases, including readonly DB and compactions are still from malloc
(2) Two level iterator itself is allocated in arena, but not iterators inside it.

Test Plan: make all check

Reviewers: ljin, haobo

Reviewed By: haobo

Subscribers: leveldb, dhruba, yhchiang, igor

Differential Revision: https://reviews.facebook.net/D18513
main
sdong 10 years ago
parent 462796697c
commit df9069d23f
  1. 105
      db/db_impl.cc
  2. 4
      db/db_impl.h
  3. 71
      db/db_iter.cc
  4. 46
      db/db_iter.h
  5. 31
      db/memtable.cc
  6. 7
      db/memtable.h
  7. 9
      db/memtable_list.cc
  8. 4
      db/memtable_list.h
  9. 12
      db/simple_table_db_test.cc
  10. 7
      db/table_cache.cc
  11. 3
      db/table_cache.h
  12. 26
      db/version_set.cc
  13. 4
      db/version_set.h
  14. 18
      include/rocksdb/memtablerep.h
  15. 9
      table/block_based_table_reader.cc
  16. 2
      table/block_based_table_reader.h
  17. 20
      table/iterator.cc
  18. 24
      table/iterator_wrapper.h
  19. 68
      table/merger.cc
  20. 33
      table/merger.h
  21. 11
      table/plain_table_reader.cc
  22. 2
      table/plain_table_reader.h
  23. 7
      table/table_reader.h
  24. 15
      table/two_level_iterator.cc
  25. 7
      table/two_level_iterator.h
  26. 2
      util/arena.h
  27. 18
      util/hash_cuckoo_rep.cc
  28. 24
      util/hash_linklist_rep.cc
  29. 23
      util/hash_skiplist_rep.cc
  30. 10
      util/skiplistrep.cc
  31. 20
      util/vectorrep.cc

@ -3179,18 +3179,34 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
ColumnFamilyData* cfd,
SuperVersion* super_version) {
std::vector<Iterator*> iterator_list;
// Collect iterator for mutable mem
iterator_list.push_back(super_version->mem->NewIterator(options));
// Collect all needed child iterators for immutable memtables
super_version->imm->AddIterators(options, &iterator_list);
// Collect iterators for files in L0 - Ln
super_version->current->AddIterators(options, storage_options_,
&iterator_list);
Iterator* internal_iter = NewMergingIterator(
&cfd->internal_comparator(), &iterator_list[0], iterator_list.size());
SuperVersion* super_version,
Arena* arena) {
Iterator* internal_iter;
if (arena != nullptr) {
// Need to create internal iterator from the arena.
MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena);
// Collect iterator for mutable mem
merge_iter_builder.AddIterator(
super_version->mem->NewIterator(options, false, arena));
// Collect all needed child iterators for immutable memtables
super_version->imm->AddIterators(options, &merge_iter_builder);
// Collect iterators for files in L0 - Ln
super_version->current->AddIterators(options, storage_options_,
&merge_iter_builder);
internal_iter = merge_iter_builder.Finish();
} else {
// Need to create internal iterator using malloc.
std::vector<Iterator*> iterator_list;
// Collect iterator for mutable mem
iterator_list.push_back(super_version->mem->NewIterator(options));
// Collect all needed child iterators for immutable memtables
super_version->imm->AddIterators(options, &iterator_list);
// Collect iterators for files in L0 - Ln
super_version->current->AddIterators(options, storage_options_,
&iterator_list);
internal_iter = NewMergingIterator(&cfd->internal_comparator(),
&iterator_list[0], iterator_list.size());
}
IterState* cleanup = new IterState(this, &mutex_, super_version);
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
@ -3541,34 +3557,77 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
Iterator* iter;
if (options.tailing) {
#ifdef ROCKSDB_LITE
// not supported in lite version
return nullptr;
#else
// TODO(ljin): remove tailing iterator
iter = new ForwardIterator(env_, this, options, cfd);
iter = NewDBIterator(env_, *cfd->options(),
cfd->user_comparator(), iter, kMaxSequenceNumber);
//iter = new TailingIterator(env_, this, options, cfd);
auto iter = new ForwardIterator(env_, this, options, cfd);
return NewDBIterator(env_, *cfd->options(), cfd->user_comparator(), iter,
kMaxSequenceNumber);
// return new TailingIterator(env_, this, options, cfd);
#endif
} else {
SequenceNumber latest_snapshot = versions_->LastSequence();
SuperVersion* sv = nullptr;
sv = cfd->GetReferencedSuperVersion(&mutex_);
iter = NewInternalIterator(options, cfd, sv);
auto snapshot =
options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot;
iter = NewDBIterator(env_, *cfd->options(),
cfd->user_comparator(), iter, snapshot);
}
return iter;
// Try to generate a DB iterator tree in continuous memory area to be
// cache friendly. Here is an example of result:
// +-------------------------------+
// | |
// | ArenaWrappedDBIter |
// | + |
// | +---> Inner Iterator ------------+
// | | | |
// | | +-- -- -- -- -- -- -- --+ |
// | +--- | Arena | |
// | | | |
// | Allocated Memory: | |
// | | +-------------------+ |
// | | | DBIter | <---+
// | | + |
// | | | +-> iter_ ------------+
// | | | | |
// | | +-------------------+ |
// | | | MergingIterator | <---+
// | | + |
// | | | +->child iter1 ------------+
// | | | | | |
// | | +->child iter2 ----------+ |
// | | | | | | |
// | | | +->child iter3 --------+ | |
// | | | | | |
// | | +-------------------+ | | |
// | | | Iterator1 | <--------+
// | | +-------------------+ | |
// | | | Iterator2 | <------+
// | | +-------------------+ |
// | | | Iterator3 | <----+
// | | +-------------------+
// | | |
// +-------+-----------------------+
//
// ArenaWrappedDBIter inlines an arena area where all the iterartor in the
// the iterator tree is allocated in the order of being accessed when
// querying.
// Laying out the iterators in the order of being accessed makes it more
// likely that any iterator pointer is close to the iterator it points to so
// that they are likely to be in the same cache line and/or page.
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, *cfd->options(), cfd->user_comparator(), snapshot);
Iterator* internal_iter =
NewInternalIterator(options, cfd, sv, db_iter->GetArena());
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
}
}
Status DBImpl::NewIterators(

@ -39,6 +39,7 @@ class Version;
class VersionEdit;
class VersionSet;
class CompactionFilterV2;
class Arena;
class DBImpl : public DB {
public:
@ -278,7 +279,8 @@ class DBImpl : public DB {
const DBOptions options_;
Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd,
SuperVersion* super_version);
SuperVersion* super_version,
Arena* arena = nullptr);
private:
friend class DB;

@ -18,6 +18,7 @@
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
#include "port/port.h"
#include "util/arena.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
@ -37,8 +38,6 @@ static void DumpInternalIter(Iterator* iter) {
}
#endif
namespace {
// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries. DBIter
// combines multiple entries for the same userkey found in the DB
@ -57,9 +56,10 @@ class DBIter: public Iterator {
kReverse
};
DBIter(Env* env, const Options& options,
const Comparator* cmp, Iterator* iter, SequenceNumber s)
: env_(env),
DBIter(Env* env, const Options& options, const Comparator* cmp,
Iterator* iter, SequenceNumber s, bool arena_mode)
: arena_mode_(arena_mode),
env_(env),
logger_(options.info_log.get()),
user_comparator_(cmp),
user_merge_operator_(options.merge_operator.get()),
@ -74,7 +74,15 @@ class DBIter: public Iterator {
}
virtual ~DBIter() {
RecordTick(statistics_, NO_ITERATORS, -1);
delete iter_;
if (!arena_mode_) {
delete iter_;
} else {
iter_->~Iterator();
}
}
virtual void SetIter(Iterator* iter) {
assert(iter_ == nullptr);
iter_ = iter;
}
virtual bool Valid() const { return valid_; }
virtual Slice key() const {
@ -116,11 +124,12 @@ class DBIter: public Iterator {
}
}
bool arena_mode_;
Env* const env_;
Logger* logger_;
const Comparator* const user_comparator_;
const MergeOperator* const user_merge_operator_;
Iterator* const iter_;
Iterator* iter_;
SequenceNumber const sequence_;
Status status_;
@ -461,16 +470,48 @@ void DBIter::SeekToLast() {
FindPrevUserEntry();
}
} // anonymous namespace
Iterator* NewDBIterator(Env* env, const Options& options,
const Comparator* user_key_comparator,
Iterator* internal_iter,
const SequenceNumber& sequence) {
return new DBIter(env, options, user_key_comparator, internal_iter, sequence,
false);
}
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~Iterator(); }
void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; }
void ArenaWrappedDBIter::SetIterUnderDBIter(Iterator* iter) {
static_cast<DBIter*>(db_iter_)->SetIter(iter);
}
inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); }
inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); }
inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); }
inline void ArenaWrappedDBIter::Seek(const Slice& target) {
db_iter_->Seek(target);
}
inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,
void* arg2) {
db_iter_->RegisterCleanup(function, arg1, arg2);
}
Iterator* NewDBIterator(
Env* env,
const Options& options,
const Comparator *user_key_comparator,
Iterator* internal_iter,
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const Options& options, const Comparator* user_key_comparator,
const SequenceNumber& sequence) {
return new DBIter(env, options, user_key_comparator,
internal_iter, sequence);
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
Arena* arena = iter->GetArena();
auto mem = arena->AllocateAligned(sizeof(DBIter));
DBIter* db_iter = new (mem)
DBIter(env, options, user_key_comparator, nullptr, sequence, true);
iter->SetDBIter(db_iter);
return iter;
}
} // namespace rocksdb

@ -11,9 +11,14 @@
#include <stdint.h>
#include "rocksdb/db.h"
#include "db/dbformat.h"
#include "util/arena.h"
#include "util/autovector.h"
namespace rocksdb {
class Arena;
class DBIter;
// Return a new iterator that converts internal keys (yielded by
// "*internal_iter") that were live at the specified "sequence" number
// into appropriate user keys.
@ -24,4 +29,45 @@ extern Iterator* NewDBIterator(
Iterator* internal_iter,
const SequenceNumber& sequence);
// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed be allocated. This class is used as an entry point of
// a iterator hierarchy whose memory can be allocated inline. In that way,
// accessing the iterator tree can be more cache friendly. It is also faster
// to allocate.
class ArenaWrappedDBIter : public Iterator {
public:
virtual ~ArenaWrappedDBIter();
// Get the arena to be used to allocate memory for DBIter to be wrapped,
// as well as child iterators in it.
virtual Arena* GetArena() { return &arena_; }
// Set the DB Iterator to be wrapped
virtual void SetDBIter(DBIter* iter);
// Set the internal iterator wrapped inside the DB Iterator. Usually it is
// a merging iterator.
virtual void SetIterUnderDBIter(Iterator* iter);
virtual bool Valid() const override;
virtual void SeekToFirst() override;
virtual void SeekToLast() override;
virtual void Seek(const Slice& target) override;
virtual void Next() override;
virtual void Prev() override;
virtual Slice key() const override;
virtual Slice value() const override;
virtual Status status() const override;
void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);
private:
DBIter* db_iter_;
Arena arena_;
};
// Generate the arena wrapped iterator class.
extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const Options& options, const Comparator* user_key_comparator,
const SequenceNumber& sequence);
} // namespace rocksdb

@ -20,6 +20,7 @@
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice_transform.h"
#include "table/merger.h"
#include "util/arena.h"
#include "util/coding.h"
#include "util/murmurhash.h"
@ -173,15 +174,24 @@ const char* EncodeKey(std::string* scratch, const Slice& target) {
class MemTableIterator: public Iterator {
public:
MemTableIterator(const MemTable& mem, const ReadOptions& options,
bool enforce_total_order)
bool enforce_total_order, Arena* arena)
: bloom_(nullptr),
prefix_extractor_(mem.prefix_extractor_),
valid_(false) {
valid_(false),
arena_mode_(arena != nullptr) {
if (prefix_extractor_ != nullptr && !enforce_total_order) {
bloom_ = mem.prefix_bloom_.get();
iter_.reset(mem.table_->GetDynamicPrefixIterator());
iter_ = mem.table_->GetDynamicPrefixIterator(arena);
} else {
iter_.reset(mem.table_->GetIterator());
iter_ = mem.table_->GetIterator(arena);
}
}
~MemTableIterator() {
if (arena_mode_) {
iter_->~Iterator();
} else {
delete iter_;
}
}
@ -228,8 +238,9 @@ class MemTableIterator: public Iterator {
private:
DynamicBloom* bloom_;
const SliceTransform* const prefix_extractor_;
std::unique_ptr<MemTableRep::Iterator> iter_;
MemTableRep::Iterator* iter_;
bool valid_;
bool arena_mode_;
// No copying allowed
MemTableIterator(const MemTableIterator&);
@ -237,8 +248,14 @@ class MemTableIterator: public Iterator {
};
Iterator* MemTable::NewIterator(const ReadOptions& options,
bool enforce_total_order) {
return new MemTableIterator(*this, options, enforce_total_order);
bool enforce_total_order, Arena* arena) {
if (arena == nullptr) {
return new MemTableIterator(*this, options, enforce_total_order, nullptr);
} else {
auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
return new (mem)
MemTableIterator(*this, options, enforce_total_order, arena);
}
}
port::RWMutex* MemTable::GetLock(const Slice& key) {

@ -21,6 +21,7 @@
namespace rocksdb {
class Arena;
class Mutex;
class MemTableIterator;
class MergeContext;
@ -77,8 +78,12 @@ class MemTable {
//
// By default, it returns an iterator for prefix seek if prefix_extractor
// is configured in Options.
// arena: If not null, the arena needs to be used to allocate the Iterator.
// Calling ~Iterator of the iterator will destroy all the states but
// those allocated in arena.
Iterator* NewIterator(const ReadOptions& options,
bool enforce_total_order = false);
bool enforce_total_order = false,
Arena* arena = nullptr);
// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.

@ -11,6 +11,7 @@
#include "db/version_set.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "table/merger.h"
#include "util/coding.h"
#include "util/log_buffer.h"
@ -78,6 +79,14 @@ void MemTableListVersion::AddIterators(const ReadOptions& options,
}
}
void MemTableListVersion::AddIterators(
const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) {
for (auto& m : memlist_) {
merge_iter_builder->AddIterator(
m->NewIterator(options, merge_iter_builder->GetArena()));
}
}
uint64_t MemTableListVersion::GetTotalNumEntries() const {
uint64_t total_num = 0;
for (auto& m : memlist_) {

@ -28,6 +28,7 @@ namespace rocksdb {
class ColumnFamilyData;
class InternalKeyComparator;
class Mutex;
class MergeIteratorBuilder;
// keeps a list of immutable memtables in a vector. the list is immutable
// if refcount is bigger than one. It is used as a state for Get() and
@ -49,6 +50,9 @@ class MemTableListVersion {
void AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iterator_list);
void AddIterators(const ReadOptions& options,
MergeIteratorBuilder* merge_iter_builder);
uint64_t GetTotalNumEntries() const;
private:

@ -83,7 +83,7 @@ public:
unique_ptr<RandomAccessFile> && file, uint64_t file_size,
unique_ptr<TableReader>* table_reader);
Iterator* NewIterator(const ReadOptions&) override;
Iterator* NewIterator(const ReadOptions&, Arena* arena) override;
Status Get(const ReadOptions&, const Slice& key, void* arg,
bool (*handle_result)(void* arg, const ParsedInternalKey& k,
@ -218,8 +218,14 @@ std::shared_ptr<const TableProperties> SimpleTableReader::GetTableProperties()
return rep_->table_properties;
}
Iterator* SimpleTableReader::NewIterator(const ReadOptions& options) {
return new SimpleTableIterator(this);
Iterator* SimpleTableReader::NewIterator(const ReadOptions& options,
Arena* arena) {
if (arena == nullptr) {
return new SimpleTableIterator(this);
} else {
auto mem = arena->AllocateAligned(sizeof(SimpleTableIterator));
return new (mem) SimpleTableIterator(this);
}
}
Status SimpleTableReader::GetOffset(const Slice& target, uint64_t* offset) {

@ -13,6 +13,7 @@
#include "db/version_edit.h"
#include "rocksdb/statistics.h"
#include "table/iterator_wrapper.h"
#include "table/table_reader.h"
#include "util/coding.h"
#include "util/stop_watch.h"
@ -102,7 +103,7 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
const InternalKeyComparator& icomparator,
const FileMetaData& file_meta,
TableReader** table_reader_ptr,
bool for_compaction) {
bool for_compaction, Arena* arena) {
if (table_reader_ptr != nullptr) {
*table_reader_ptr = nullptr;
}
@ -113,12 +114,12 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
s = FindTable(toptions, icomparator, file_meta.number, file_meta.file_size,
&handle, nullptr, options.read_tier == kBlockCacheTier);
if (!s.ok()) {
return NewErrorIterator(s);
return NewErrorIterator(s, arena);
}
table_reader = GetTableReaderFromHandle(handle);
}
Iterator* result = table_reader->NewIterator(options);
Iterator* result = table_reader->NewIterator(options, arena);
if (handle != nullptr) {
result->RegisterCleanup(&UnrefEntry, cache_, handle);
}

@ -23,6 +23,7 @@
namespace rocksdb {
class Env;
class Arena;
struct FileMetaData;
// TODO(sdong): try to come up with a better API to pass the file information
@ -44,7 +45,7 @@ class TableCache {
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
TableReader** table_reader_ptr = nullptr,
bool for_compaction = false);
bool for_compaction = false, Arena* arena = nullptr);
// If a seek to internal key "k" in specified file finds an entry,
// call (*handle_result)(arg, found_key, found_value) repeatedly until

@ -332,6 +332,32 @@ void Version::AddIterators(const ReadOptions& read_options,
}
}
void Version::AddIterators(const ReadOptions& read_options,
const EnvOptions& soptions,
MergeIteratorBuilder* merge_iter_builder) {
// Merge all level zero files together since they may overlap
for (const FileMetaData* file : files_[0]) {
merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(), *file, nullptr,
false, merge_iter_builder->GetArena()));
}
// For levels > 0, we can use a concatenating iterator that sequentially
// walks through the non-overlapping files in the level, opening them
// lazily.
for (int level = 1; level < num_levels_; level++) {
if (!files_[level].empty()) {
merge_iter_builder->AddIterator(NewTwoLevelIterator(
new LevelFileIteratorState(
cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(), false /* for_compaction */,
cfd_->options()->prefix_extractor != nullptr),
new LevelFileNumIterator(cfd_->internal_comparator(), &files_[level]),
merge_iter_builder->GetArena()));
}
}
}
// Callback from TableCache::Get()
namespace {
enum SaverState {

@ -51,6 +51,7 @@ class MergeContext;
class ColumnFamilyData;
class ColumnFamilySet;
class TableCache;
class MergeIteratorBuilder;
// Return the smallest index i such that files[i]->largest >= key.
// Return files.size() if there is no such file.
@ -80,6 +81,9 @@ class Version {
void AddIterators(const ReadOptions&, const EnvOptions& soptions,
std::vector<Iterator*>* iters);
void AddIterators(const ReadOptions&, const EnvOptions& soptions,
MergeIteratorBuilder* merger_iter_builder);
// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status. Fills *stats.
// Uses *operands to store merge_operator operations to apply later

@ -142,16 +142,28 @@ class MemTableRep {
};
// Return an iterator over the keys in this representation.
virtual Iterator* GetIterator() = 0;
// arena: If not null, the arena needs to be used to allocate the Iterator.
// When destroying the iterator, the caller will not call "delete"
// but Iterator::~Iterator() directly. The destructor needs to destroy
// all the states but those allocated in arena.
virtual Iterator* GetIterator(Arena* arena = nullptr) = 0;
// Return an iterator over at least the keys with the specified user key. The
// iterator may also allow access to other keys, but doesn't have to. Default:
// GetIterator().
virtual Iterator* GetIterator(const Slice& user_key) { return GetIterator(); }
virtual Iterator* GetIterator(const Slice& user_key) {
return GetIterator(nullptr);
}
// Return an iterator that has a special Seek semantics. The result of
// a Seek might only include keys with the same prefix as the target key.
virtual Iterator* GetDynamicPrefixIterator() { return GetIterator(); }
// arena: If not null, the arena needs to be used to allocate the Iterator.
// When destroying the iterator, the caller will not call "delete"
// but Iterator::~Iterator() directly. The destructor needs to destroy
// all the states but those allocated in arena.
virtual Iterator* GetDynamicPrefixIterator(Arena* arena = nullptr) {
return GetIterator(arena);
}
// Return true if the current MemTableRep supports merge operator.
// Default: true

@ -980,10 +980,11 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) {
return may_match;
}
Iterator* BlockBasedTable::NewIterator(const ReadOptions& read_options) {
return NewTwoLevelIterator(new BlockEntryIteratorState(this, read_options,
nullptr),
NewIndexIterator(read_options));
Iterator* BlockBasedTable::NewIterator(const ReadOptions& read_options,
Arena* arena) {
return NewTwoLevelIterator(
new BlockEntryIteratorState(this, read_options, nullptr),
NewIndexIterator(read_options), arena);
}
Status BlockBasedTable::Get(

@ -68,7 +68,7 @@ class BlockBasedTable : public TableReader {
// Returns a new iterator over the table contents.
// The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it).
Iterator* NewIterator(const ReadOptions&) override;
Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override;
Status Get(const ReadOptions& readOptions, const Slice& key,
void* handle_context,

@ -8,6 +8,8 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/iterator.h"
#include "table/iterator_wrapper.h"
#include "util/arena.h"
namespace rocksdb {
@ -65,8 +67,26 @@ Iterator* NewEmptyIterator() {
return new EmptyIterator(Status::OK());
}
Iterator* NewEmptyIterator(Arena* arena) {
if (arena == nullptr) {
return NewEmptyIterator();
} else {
auto mem = arena->AllocateAligned(sizeof(EmptyIterator));
return new (mem) EmptyIterator(Status::OK());
}
}
Iterator* NewErrorIterator(const Status& status) {
return new EmptyIterator(status);
}
Iterator* NewErrorIterator(const Status& status, Arena* arena) {
if (arena == nullptr) {
return NewErrorIterator(status);
} else {
auto mem = arena->AllocateAligned(sizeof(EmptyIterator));
return new (mem) EmptyIterator(status);
}
}
} // namespace rocksdb

@ -23,14 +23,7 @@ class IteratorWrapper {
explicit IteratorWrapper(Iterator* iter): iter_(nullptr) {
Set(iter);
}
IteratorWrapper(const IteratorWrapper&) {
// Iterator wrapper exclusively owns iter_ so it cannot be copied.
// Didn't delete the function because vector<IteratorWrapper> requires
// this function to compile.
assert(false);
}
void operator=(const IteratorWrapper&) = delete;
~IteratorWrapper() { delete iter_; }
~IteratorWrapper() {}
Iterator* iter() const { return iter_; }
// Takes ownership of "iter" and will delete it when destroyed, or
@ -45,6 +38,14 @@ class IteratorWrapper {
}
}
void DeleteIter(bool is_arena_mode) {
if (!is_arena_mode) {
delete iter_;
} else {
iter_->~Iterator();
}
}
// Iterator interface methods
bool Valid() const { return valid_; }
Slice key() const { assert(Valid()); return key_; }
@ -70,4 +71,11 @@ class IteratorWrapper {
Slice key_;
};
class Arena;
// Return an empty iterator (yields nothing) allocated from arena.
extern Iterator* NewEmptyIterator(Arena* arena);
// Return an empty iterator with the specified status, allocated arena.
extern Iterator* NewErrorIterator(const Status& status, Arena* arena);
} // namespace rocksdb

@ -17,13 +17,13 @@
#include "rocksdb/options.h"
#include "table/iter_heap.h"
#include "table/iterator_wrapper.h"
#include "util/arena.h"
#include "util/stop_watch.h"
#include "util/perf_context_imp.h"
#include "util/autovector.h"
namespace rocksdb {
namespace {
typedef std::priority_queue<
IteratorWrapper*,
std::vector<IteratorWrapper*>,
@ -43,13 +43,16 @@ MaxIterHeap NewMaxIterHeap(const Comparator* comparator) {
MinIterHeap NewMinIterHeap(const Comparator* comparator) {
return MinIterHeap(MinIteratorComparator(comparator));
}
} // namespace
const size_t kNumIterReserve = 4;
class MergingIterator : public Iterator {
public:
MergingIterator(const Comparator* comparator, Iterator** children, int n)
: comparator_(comparator),
MergingIterator(const Comparator* comparator, Iterator** children, int n,
bool is_arena_mode)
: is_arena_mode_(is_arena_mode),
comparator_(comparator),
current_(nullptr),
use_heap_(true),
direction_(kForward),
@ -66,7 +69,20 @@ class MergingIterator : public Iterator {
}
}
virtual ~MergingIterator() { }
virtual void AddIterator(Iterator* iter) {
assert(direction_ == kForward);
children_.emplace_back(iter);
auto new_wrapper = children_.back();
if (new_wrapper.Valid()) {
minHeap_.push(&new_wrapper);
}
}
virtual ~MergingIterator() {
for (auto& child : children_) {
child.DeleteIter(is_arena_mode_);
}
}
virtual bool Valid() const {
return (current_ != nullptr);
@ -242,6 +258,7 @@ class MergingIterator : public Iterator {
void FindLargest();
void ClearHeaps();
bool is_arena_mode_;
const Comparator* comparator_;
autovector<IteratorWrapper, kNumIterReserve> children_;
IteratorWrapper* current_;
@ -288,16 +305,51 @@ void MergingIterator::ClearHeaps() {
maxHeap_ = NewMaxIterHeap(comparator_);
minHeap_ = NewMinIterHeap(comparator_);
}
} // namespace
Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n) {
Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n,
Arena* arena) {
assert(n >= 0);
if (n == 0) {
return NewEmptyIterator();
return NewEmptyIterator(arena);
} else if (n == 1) {
return list[0];
} else {
return new MergingIterator(cmp, list, n);
if (arena == nullptr) {
return new MergingIterator(cmp, list, n, false);
} else {
auto mem = arena->AllocateAligned(sizeof(MergingIterator));
return new (mem) MergingIterator(cmp, list, n, true);
}
}
}
MergeIteratorBuilder::MergeIteratorBuilder(const Comparator* comparator,
Arena* a)
: first_iter(nullptr), use_merging_iter(false), arena(a) {
auto mem = arena->AllocateAligned(sizeof(MergingIterator));
merge_iter = new (mem) MergingIterator(comparator, nullptr, 0, true);
}
void MergeIteratorBuilder::AddIterator(Iterator* iter) {
if (!use_merging_iter && first_iter != nullptr) {
merge_iter->AddIterator(first_iter);
use_merging_iter = true;
}
if (use_merging_iter) {
merge_iter->AddIterator(iter);
} else {
first_iter = iter;
}
}
Iterator* MergeIteratorBuilder::Finish() {
if (!use_merging_iter) {
return first_iter;
} else {
auto ret = merge_iter;
merge_iter = nullptr;
return ret;
}
}

@ -9,11 +9,14 @@
#pragma once
#include "rocksdb/types.h"
namespace rocksdb {
class Comparator;
class Iterator;
class Env;
class Arena;
// Return an iterator that provided the union of the data in
// children[0,n-1]. Takes ownership of the child iterators and
@ -24,6 +27,34 @@ class Env;
//
// REQUIRES: n >= 0
extern Iterator* NewMergingIterator(const Comparator* comparator,
Iterator** children, int n);
Iterator** children, int n,
Arena* arena = nullptr);
class MergingIterator;
// A builder class to build a merging iterator by adding iterators one by one.
class MergeIteratorBuilder {
public:
// comparator: the comparator used in merging comparator
// arena: where the merging iterator needs to be allocated from.
explicit MergeIteratorBuilder(const Comparator* comparator, Arena* arena);
~MergeIteratorBuilder() {}
// Add iter to the merging iterator.
void AddIterator(Iterator* iter);
// Get arena used to build the merging iterator. It is called one a child
// iterator needs to be allocated.
Arena* GetArena() { return arena; }
// Return the result merging iterator.
Iterator* Finish();
private:
MergingIterator* merge_iter;
Iterator* first_iter;
bool use_merging_iter;
Arena* arena;
};
} // namespace rocksdb

@ -156,8 +156,15 @@ Status PlainTableReader::Open(const Options& options,
void PlainTableReader::SetupForCompaction() {
}
Iterator* PlainTableReader::NewIterator(const ReadOptions& options) {
return new PlainTableIterator(this, options_.prefix_extractor != nullptr);
Iterator* PlainTableReader::NewIterator(const ReadOptions& options,
Arena* arena) {
if (arena == nullptr) {
return new PlainTableIterator(this, options_.prefix_extractor != nullptr);
} else {
auto mem = arena->AllocateAligned(sizeof(PlainTableIterator));
return new (mem)
PlainTableIterator(this, options_.prefix_extractor != nullptr);
}
}
struct PlainTableReader::IndexRecord {

@ -55,7 +55,7 @@ class PlainTableReader: public TableReader {
const int bloom_bits_per_key, double hash_table_ratio,
size_t index_sparseness, size_t huge_page_tlb_size);
Iterator* NewIterator(const ReadOptions&);
Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override;
Status Get(const ReadOptions&, const Slice& key, void* arg,
bool (*result_handler)(void* arg, const ParsedInternalKey& k,

@ -15,6 +15,7 @@ namespace rocksdb {
class Iterator;
struct ParsedInternalKey;
class Slice;
class Arena;
struct ReadOptions;
struct TableProperties;
@ -28,7 +29,11 @@ class TableReader {
// Returns a new iterator over the table contents.
// The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it).
virtual Iterator* NewIterator(const ReadOptions&) = 0;
// arena: If not null, the arena needs to be used to allocate the Iterator.
// When destroying the iterator, the caller will not call "delete"
// but Iterator::~Iterator() directly. The destructor needs to destroy
// all the states but those allocated in arena.
virtual Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) = 0;
// Given a key, return an approximate byte offset in the file where
// the data for that key begins (or would begin if the key were

@ -13,6 +13,7 @@
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/format.h"
#include "util/arena.h"
namespace rocksdb {
@ -23,7 +24,10 @@ class TwoLevelIterator: public Iterator {
explicit TwoLevelIterator(TwoLevelIteratorState* state,
Iterator* first_level_iter);
virtual ~TwoLevelIterator() {}
virtual ~TwoLevelIterator() {
first_level_iter_.DeleteIter(false);
second_level_iter_.DeleteIter(false);
}
virtual void Seek(const Slice& target);
virtual void SeekToFirst();
@ -183,8 +187,13 @@ void TwoLevelIterator::InitDataBlock() {
} // namespace
Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state,
Iterator* first_level_iter) {
return new TwoLevelIterator(state, first_level_iter);
Iterator* first_level_iter, Arena* arena) {
if (arena == nullptr) {
return new TwoLevelIterator(state, first_level_iter);
} else {
auto mem = arena->AllocateAligned(sizeof(TwoLevelIterator));
return new (mem) TwoLevelIterator(state, first_level_iter);
}
}
} // namespace rocksdb

@ -16,6 +16,7 @@ namespace rocksdb {
struct ReadOptions;
class InternalKeyComparator;
class Arena;
struct TwoLevelIteratorState {
explicit TwoLevelIteratorState(bool check_prefix_may_match)
@ -39,7 +40,11 @@ struct TwoLevelIteratorState {
//
// Uses a supplied function to convert an index_iter value into
// an iterator over the contents of the corresponding block.
// arena: If not null, the arena is used to allocate the Iterator.
// When destroying the iterator, the destructor will destroy
// all the states but those allocated in arena.
extern Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state,
Iterator* first_level_iter);
Iterator* first_level_iter,
Arena* arena = nullptr);
} // namespace rocksdb

@ -23,6 +23,8 @@ namespace rocksdb {
class Logger;
const size_t kInlineSize = 2048;
class Arena {
public:
// No copying allowed

@ -251,7 +251,7 @@ class HashCuckooRep : public MemTableRep {
// are sorted according to the user specified KeyComparator. Note that
// any insert after this function call may affect the sorted nature of
// the returned iterator.
virtual MemTableRep::Iterator* GetIterator() override {
virtual MemTableRep::Iterator* GetIterator(Arena* arena) override {
std::vector<const char*> compact_buckets;
for (unsigned int bid = 0; bid < bucket_count_; ++bid) {
const char* bucket = cuckoo_array_[bid].load(std::memory_order_relaxed);
@ -266,10 +266,18 @@ class HashCuckooRep : public MemTableRep {
compact_buckets.push_back(iter->key());
}
}
return new Iterator(
std::shared_ptr<std::vector<const char*>>(
new std::vector<const char*>(std::move(compact_buckets))),
compare_);
if (arena == nullptr) {
return new Iterator(
std::shared_ptr<std::vector<const char*>>(
new std::vector<const char*>(std::move(compact_buckets))),
compare_);
} else {
auto mem = arena->AllocateAligned(sizeof(Iterator));
return new (mem) Iterator(
std::shared_ptr<std::vector<const char*>>(
new std::vector<const char*>(std::move(compact_buckets))),
compare_);
}
}
};

@ -70,11 +70,12 @@ class HashLinkListRep : public MemTableRep {
virtual ~HashLinkListRep();
virtual MemTableRep::Iterator* GetIterator() override;
virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override;
virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override;
virtual MemTableRep::Iterator* GetDynamicPrefixIterator() override;
virtual MemTableRep::Iterator* GetDynamicPrefixIterator(
Arena* arena = nullptr) override;
private:
friend class DynamicIterator;
@ -411,7 +412,7 @@ void HashLinkListRep::Get(const LookupKey& k, void* callback_args,
}
}
MemTableRep::Iterator* HashLinkListRep::GetIterator() {
MemTableRep::Iterator* HashLinkListRep::GetIterator(Arena* alloc_arena) {
// allocate a new arena of similar size to the one currently in use
Arena* new_arena = new Arena(arena_->BlockSize());
auto list = new FullList(compare_, new_arena);
@ -424,7 +425,12 @@ MemTableRep::Iterator* HashLinkListRep::GetIterator() {
}
}
}
return new FullListIterator(list, new_arena);
if (alloc_arena == nullptr) {
return new FullListIterator(list, new_arena);
} else {
auto mem = alloc_arena->AllocateAligned(sizeof(FullListIterator));
return new (mem) FullListIterator(list, new_arena);
}
}
MemTableRep::Iterator* HashLinkListRep::GetIterator(const Slice& slice) {
@ -435,8 +441,14 @@ MemTableRep::Iterator* HashLinkListRep::GetIterator(const Slice& slice) {
return new Iterator(this, bucket);
}
MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator() {
return new DynamicIterator(*this);
MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator(
Arena* alloc_arena) {
if (alloc_arena == nullptr) {
return new DynamicIterator(*this);
} else {
auto mem = alloc_arena->AllocateAligned(sizeof(DynamicIterator));
return new (mem) DynamicIterator(*this);
}
}
bool HashLinkListRep::BucketContains(Node* head, const Slice& user_key) const {

@ -38,11 +38,12 @@ class HashSkipListRep : public MemTableRep {
virtual ~HashSkipListRep();
virtual MemTableRep::Iterator* GetIterator() override;
virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override;
virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override;
virtual MemTableRep::Iterator* GetDynamicPrefixIterator() override;
virtual MemTableRep::Iterator* GetDynamicPrefixIterator(
Arena* arena = nullptr) override;
private:
friend class DynamicIterator;
@ -288,7 +289,7 @@ void HashSkipListRep::Get(const LookupKey& k, void* callback_args,
}
}
MemTableRep::Iterator* HashSkipListRep::GetIterator() {
MemTableRep::Iterator* HashSkipListRep::GetIterator(Arena* arena) {
// allocate a new arena of similar size to the one currently in use
Arena* new_arena = new Arena(arena_->BlockSize());
auto list = new Bucket(compare_, new_arena);
@ -301,7 +302,12 @@ MemTableRep::Iterator* HashSkipListRep::GetIterator() {
}
}
}
return new Iterator(list, true, new_arena);
if (arena == nullptr) {
return new Iterator(list, true, new_arena);
} else {
auto mem = arena->AllocateAligned(sizeof(Iterator));
return new (mem) Iterator(list, true, new_arena);
}
}
MemTableRep::Iterator* HashSkipListRep::GetIterator(const Slice& slice) {
@ -312,8 +318,13 @@ MemTableRep::Iterator* HashSkipListRep::GetIterator(const Slice& slice) {
return new Iterator(bucket, false);
}
MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator() {
return new DynamicIterator(*this);
MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator(Arena* arena) {
if (arena == nullptr) {
return new DynamicIterator(*this);
} else {
auto mem = arena->AllocateAligned(sizeof(DynamicIterator));
return new (mem) DynamicIterator(*this);
}
}
} // anon namespace

@ -6,6 +6,7 @@
#include "rocksdb/memtablerep.h"
#include "db/memtable.h"
#include "db/skiplist.h"
#include "util/arena.h"
namespace rocksdb {
namespace {
@ -108,8 +109,13 @@ public:
// Unhide default implementations of GetIterator
using MemTableRep::GetIterator;
virtual MemTableRep::Iterator* GetIterator() override {
return new SkipListRep::Iterator(&skip_list_);
virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override {
if (arena == nullptr) {
return new SkipListRep::Iterator(&skip_list_);
} else {
auto mem = arena->AllocateAligned(sizeof(SkipListRep::Iterator));
return new (mem) SkipListRep::Iterator(&skip_list_);
}
}
};
}

@ -95,7 +95,7 @@ class VectorRep : public MemTableRep {
using MemTableRep::GetIterator;
// Return an iterator over the keys in this representation.
virtual MemTableRep::Iterator* GetIterator() override;
virtual MemTableRep::Iterator* GetIterator(Arena* arena) override;
private:
friend class Iterator;
@ -259,16 +259,28 @@ void VectorRep::Get(const LookupKey& k, void* callback_args,
}
}
MemTableRep::Iterator* VectorRep::GetIterator() {
MemTableRep::Iterator* VectorRep::GetIterator(Arena* arena) {
char* mem = nullptr;
if (arena != nullptr) {
mem = arena->AllocateAligned(sizeof(Iterator));
}
ReadLock l(&rwlock_);
// Do not sort here. The sorting would be done the first time
// a Seek is performed on the iterator.
if (immutable_) {
return new Iterator(this, bucket_, compare_);
if (arena == nullptr) {
return new Iterator(this, bucket_, compare_);
} else {
return new (mem) Iterator(this, bucket_, compare_);
}
} else {
std::shared_ptr<Bucket> tmp;
tmp.reset(new Bucket(*bucket_)); // make a copy
return new Iterator(nullptr, tmp, compare_);
if (arena == nullptr) {
return new Iterator(nullptr, tmp, compare_);
} else {
return new (mem) Iterator(nullptr, tmp, compare_);
}
}
}
} // anon namespace

Loading…
Cancel
Save