diff --git a/build_tools/regression_build_test.sh b/build_tools/regression_build_test.sh index 58766f5df..82e3380fa 100755 --- a/build_tools/regression_build_test.sh +++ b/build_tools/regression_build_test.sh @@ -308,9 +308,9 @@ function send_benchmark_to_ods { file="$3" QPS=$(grep $bench $file | awk '{print $5}') - P50_MICROS=$(grep $bench $file -A 4 | tail -n1 | awk '{print $3}' ) - P75_MICROS=$(grep $bench $file -A 4 | tail -n1 | awk '{print $5}' ) - P99_MICROS=$(grep $bench $file -A 4 | tail -n1 | awk '{print $7}' ) + P50_MICROS=$(grep $bench $file -A 6 | grep "Percentiles" | awk '{print $3}' ) + P75_MICROS=$(grep $bench $file -A 6 | grep "Percentiles" | awk '{print $5}' ) + P99_MICROS=$(grep $bench $file -A 6 | grep "Percentiles" | awk '{print $7}' ) send_to_ods rocksdb.build.$bench_key.qps $QPS send_to_ods rocksdb.build.$bench_key.p50_micros $P50_MICROS diff --git a/db/column_family.cc b/db/column_family.cc index 9cf0c0d49..45a3e9a82 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -506,6 +506,10 @@ void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) { max_column_family_ = std::max(new_max_column_family, max_column_family_); } +size_t ColumnFamilySet::NumberOfColumnFamilies() const { + return column_families_.size(); +} + // under a DB mutex ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( const std::string& name, uint32_t id, Version* dummy_versions, diff --git a/db/column_family.h b/db/column_family.h index d306f4e66..991bb0112 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -180,7 +180,7 @@ class ColumnFamilyData { void SetCurrent(Version* current); void CreateNewMemtable(); - TableCache* table_cache() { return table_cache_.get(); } + TableCache* table_cache() const { return table_cache_.get(); } // See documentation in compaction_picker.h Compaction* PickCompaction(LogBuffer* log_buffer); @@ -302,8 +302,8 @@ class ColumnFamilyData { // family might get dropped when the DB mutex is released // * GetDefault() -- thread safe // * GetColumnFamily() -- either inside of DB mutex or call Lock() <-> Unlock() -// * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily() -- -// inside of DB mutex +// * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily(), +// NumberOfColumnFamilies -- inside of DB mutex class ColumnFamilySet { public: // ColumnFamilySet supports iteration @@ -342,6 +342,7 @@ class ColumnFamilySet { uint32_t GetNextColumnFamilyID(); uint32_t GetMaxColumnFamily(); void UpdateMaxColumnFamily(uint32_t new_max_column_family); + size_t NumberOfColumnFamilies() const; ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id, Version* dummy_version, diff --git a/db/db_bench.cc b/db/db_bench.cc index dbd8cecd9..b8e4b3213 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -465,6 +465,8 @@ static auto FLAGS_compaction_fadvice_e = DEFINE_bool(use_tailing_iterator, false, "Use tailing iterator to access a series of keys instead of get"); +DEFINE_int64(iter_refresh_interval_us, -1, + "How often to refresh iterators. Disable refresh when -1"); DEFINE_bool(use_adaptive_mutex, rocksdb::Options().use_adaptive_mutex, "Use adaptive mutex"); @@ -1926,7 +1928,7 @@ class Benchmark { } char msg[100]; - snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", + snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n", found, read); thread->stats.AddMessage(msg); @@ -2009,12 +2011,31 @@ class Benchmark { multi_iters.push_back(db->NewIterator(options)); } } + uint64_t last_refresh = FLAGS_env->NowMicros(); Slice key = AllocateKey(); std::unique_ptr key_guard(key.data()); Duration duration(FLAGS_duration, reads_); while (!duration.Done(1)) { + if (!FLAGS_use_tailing_iterator && FLAGS_iter_refresh_interval_us >= 0) { + uint64_t now = FLAGS_env->NowMicros(); + if (now - last_refresh > (uint64_t)FLAGS_iter_refresh_interval_us) { + if (db_ != nullptr) { + delete single_iter; + single_iter = db_->NewIterator(options); + } else { + for (auto iter : multi_iters) { + delete iter; + } + multi_iters.clear(); + for (DB* db : multi_dbs_) { + multi_iters.push_back(db->NewIterator(options)); + } + } + } + last_refresh = now; + } // Pick a Iterator to use Iterator* iter_to_use = single_iter; if (single_iter == nullptr) { @@ -2035,9 +2056,12 @@ class Benchmark { } char msg[100]; - snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", + snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n", found, read); thread->stats.AddMessage(msg); + if (FLAGS_perf_level > 0) { + thread->stats.AddMessage(perf_context.ToString()); + } } void SeekRandomWhileWriting(ThreadState* thread) { diff --git a/db/db_impl.cc b/db/db_impl.cc index f87442767..5301fa599 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -36,6 +36,7 @@ #include "db/table_cache.h" #include "db/table_properties_collector.h" #include "db/tailing_iter.h" +#include "db/forward_iterator.h" #include "db/transaction_log_impl.h" #include "db/version_set.h" #include "db/write_batch_internal.h" @@ -2067,7 +2068,15 @@ void DBImpl::BackgroundCallCompaction() { if (madeProgress || bg_schedule_needed_) { MaybeScheduleFlushOrCompaction(); } - bg_cv_.SignalAll(); + if (madeProgress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) { + // signal if + // * madeProgress -- need to wakeup MakeRoomForWrite + // * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl + // * bg_manual_only_ > 0 -- need to wakeup RunManualCompaction + // If none of this is true, there is no need to signal since nobody is + // waiting for it + bg_cv_.SignalAll(); + } // IMPORTANT: there should be no code after calling SignalAll. This call may // signal the DB destructor that it's OK to proceed with destruction. In // that case, all DB variables will be dealloacated and referencing them @@ -2578,7 +2587,7 @@ Status DBImpl::ProcessKeyValueCompaction( cfd->user_comparator()->Compare(ikey.user_key, current_user_key.GetKey()) != 0) { // First occurrence of this user key - current_user_key.SetUserKey(ikey.user_key); + current_user_key.SetKey(ikey.user_key); has_current_user_key = true; last_sequence_for_key = kMaxSequenceNumber; visible_in_snapshot = kMaxSequenceNumber; @@ -3170,18 +3179,34 @@ static void CleanupIteratorState(void* arg1, void* arg2) { Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, ColumnFamilyData* cfd, - SuperVersion* super_version) { - std::vector iterator_list; - // Collect iterator for mutable mem - iterator_list.push_back(super_version->mem->NewIterator(options)); - // Collect all needed child iterators for immutable memtables - super_version->imm->AddIterators(options, &iterator_list); - // Collect iterators for files in L0 - Ln - super_version->current->AddIterators(options, storage_options_, - &iterator_list); - Iterator* internal_iter = NewMergingIterator( - &cfd->internal_comparator(), &iterator_list[0], iterator_list.size()); - + SuperVersion* super_version, + Arena* arena) { + Iterator* internal_iter; + if (arena != nullptr) { + // Need to create internal iterator from the arena. + MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena); + // Collect iterator for mutable mem + merge_iter_builder.AddIterator( + super_version->mem->NewIterator(options, false, arena)); + // Collect all needed child iterators for immutable memtables + super_version->imm->AddIterators(options, &merge_iter_builder); + // Collect iterators for files in L0 - Ln + super_version->current->AddIterators(options, storage_options_, + &merge_iter_builder); + internal_iter = merge_iter_builder.Finish(); + } else { + // Need to create internal iterator using malloc. + std::vector iterator_list; + // Collect iterator for mutable mem + iterator_list.push_back(super_version->mem->NewIterator(options)); + // Collect all needed child iterators for immutable memtables + super_version->imm->AddIterators(options, &iterator_list); + // Collect iterators for files in L0 - Ln + super_version->current->AddIterators(options, storage_options_, + &iterator_list); + internal_iter = NewMergingIterator(&cfd->internal_comparator(), + &iterator_list[0], iterator_list.size()); + } IterState* cleanup = new IterState(this, &mutex_, super_version); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); @@ -3532,30 +3557,77 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); - Iterator* iter; if (options.tailing) { #ifdef ROCKSDB_LITE // not supported in lite version return nullptr; #else - iter = new TailingIterator(env_, this, options, cfd); + // TODO(ljin): remove tailing iterator + auto iter = new ForwardIterator(this, options, cfd); + return NewDBIterator(env_, *cfd->options(), cfd->user_comparator(), iter, + kMaxSequenceNumber); +// return new TailingIterator(env_, this, options, cfd); #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); SuperVersion* sv = nullptr; sv = cfd->GetReferencedSuperVersion(&mutex_); - iter = NewInternalIterator(options, cfd, sv); - auto snapshot = options.snapshot != nullptr ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot; - iter = NewDBIterator(env_, *cfd->options(), - cfd->user_comparator(), iter, snapshot); - } - return iter; + // Try to generate a DB iterator tree in continuous memory area to be + // cache friendly. Here is an example of result: + // +-------------------------------+ + // | | + // | ArenaWrappedDBIter | + // | + | + // | +---> Inner Iterator ------------+ + // | | | | + // | | +-- -- -- -- -- -- -- --+ | + // | +--- | Arena | | + // | | | | + // | Allocated Memory: | | + // | | +-------------------+ | + // | | | DBIter | <---+ + // | | + | + // | | | +-> iter_ ------------+ + // | | | | | + // | | +-------------------+ | + // | | | MergingIterator | <---+ + // | | + | + // | | | +->child iter1 ------------+ + // | | | | | | + // | | +->child iter2 ----------+ | + // | | | | | | | + // | | | +->child iter3 --------+ | | + // | | | | | | + // | | +-------------------+ | | | + // | | | Iterator1 | <--------+ + // | | +-------------------+ | | + // | | | Iterator2 | <------+ + // | | +-------------------+ | + // | | | Iterator3 | <----+ + // | | +-------------------+ + // | | | + // +-------+-----------------------+ + // + // ArenaWrappedDBIter inlines an arena area where all the iterartor in the + // the iterator tree is allocated in the order of being accessed when + // querying. + // Laying out the iterators in the order of being accessed makes it more + // likely that any iterator pointer is close to the iterator it points to so + // that they are likely to be in the same cache line and/or page. + ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( + env_, *cfd->options(), cfd->user_comparator(), snapshot); + Iterator* internal_iter = + NewInternalIterator(options, cfd, sv, db_iter->GetArena()); + db_iter->SetIterUnderDBIter(internal_iter); + + return db_iter; + } } Status DBImpl::NewIterators( @@ -3679,15 +3751,17 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { uint64_t flush_column_family_if_log_file = 0; uint64_t max_total_wal_size = (options_.max_total_wal_size == 0) - ? 2 * max_total_in_memory_state_ + ? 4 * max_total_in_memory_state_ : options_.max_total_wal_size; - if (alive_log_files_.begin()->getting_flushed == false && + if (versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1 && + alive_log_files_.begin()->getting_flushed == false && total_log_size_ > max_total_wal_size) { flush_column_family_if_log_file = alive_log_files_.begin()->number; alive_log_files_.begin()->getting_flushed = true; Log(options_.info_log, - "Flushing all column families with data in WAL number %" PRIu64, - flush_column_family_if_log_file); + "Flushing all column families with data in WAL number %" PRIu64 + ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64, + flush_column_family_if_log_file, total_log_size_, max_total_wal_size); } Status status; @@ -3923,6 +3997,10 @@ Status DBImpl::MakeRoomForWrite( uint64_t rate_limit_delay_millis = 0; Status s; double score; + // Once we schedule background work, we shouldn't schedule it again, since it + // might generate a tight feedback loop, constantly scheduling more background + // work, even if additional background work is not needed + bool schedule_background_work = true; while (true) { if (!bg_error_.ok()) { @@ -3966,7 +4044,10 @@ Status DBImpl::MakeRoomForWrite( DelayLoggingAndReset(); Log(options_.info_log, "[%s] wait for memtable flush...\n", cfd->GetName().c_str()); - MaybeScheduleFlushOrCompaction(); + if (schedule_background_work) { + MaybeScheduleFlushOrCompaction(); + schedule_background_work = false; + } uint64_t stall; { StopWatch sw(env_, options_.statistics.get(), diff --git a/db/db_impl.h b/db/db_impl.h index cc59cfd7c..6049db8d6 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -39,6 +39,7 @@ class Version; class VersionEdit; class VersionSet; class CompactionFilterV2; +class Arena; class DBImpl : public DB { public: @@ -278,13 +279,15 @@ class DBImpl : public DB { const DBOptions options_; Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd, - SuperVersion* super_version); + SuperVersion* super_version, + Arena* arena = nullptr); private: friend class DB; friend class InternalStats; #ifndef ROCKSDB_LITE friend class TailingIterator; + friend class ForwardIterator; #endif friend struct SuperVersion; struct CompactionState; @@ -449,7 +452,16 @@ class DBImpl : public DB { // State below is protected by mutex_ port::Mutex mutex_; port::AtomicPointer shutting_down_; - port::CondVar bg_cv_; // Signalled when background work finishes + // This condition variable is signaled on these conditions: + // * whenever bg_compaction_scheduled_ goes down to 0 + // * if bg_manual_only_ > 0, whenever a compaction finishes, even if it hasn't + // made any progress + // * whenever a compaction made any progress + // * whenever bg_flush_scheduled_ value decreases (i.e. whenever a flush is + // done, even if it didn't make any progress) + // * whenever there is an error in background flush or compaction + // * whenever bg_logstats_scheduled_ turns to false + port::CondVar bg_cv_; uint64_t logfile_number_; unique_ptr log_; bool log_empty_; diff --git a/db/db_iter.cc b/db/db_iter.cc index e7ff2ba25..1f49b7aa7 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -18,6 +18,7 @@ #include "rocksdb/iterator.h" #include "rocksdb/merge_operator.h" #include "port/port.h" +#include "util/arena.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/perf_context_imp.h" @@ -37,8 +38,6 @@ static void DumpInternalIter(Iterator* iter) { } #endif -namespace { - // Memtables and sstables that make the DB representation contain // (userkey,seq,type) => uservalue entries. DBIter // combines multiple entries for the same userkey found in the DB @@ -57,9 +56,10 @@ class DBIter: public Iterator { kReverse }; - DBIter(Env* env, const Options& options, - const Comparator* cmp, Iterator* iter, SequenceNumber s) - : env_(env), + DBIter(Env* env, const Options& options, const Comparator* cmp, + Iterator* iter, SequenceNumber s, bool arena_mode) + : arena_mode_(arena_mode), + env_(env), logger_(options.info_log.get()), user_comparator_(cmp), user_merge_operator_(options.merge_operator.get()), @@ -74,7 +74,15 @@ class DBIter: public Iterator { } virtual ~DBIter() { RecordTick(statistics_, NO_ITERATORS, -1); - delete iter_; + if (!arena_mode_) { + delete iter_; + } else { + iter_->~Iterator(); + } + } + virtual void SetIter(Iterator* iter) { + assert(iter_ == nullptr); + iter_ = iter; } virtual bool Valid() const { return valid_; } virtual Slice key() const { @@ -116,11 +124,12 @@ class DBIter: public Iterator { } } + bool arena_mode_; Env* const env_; Logger* logger_; const Comparator* const user_comparator_; const MergeOperator* const user_merge_operator_; - Iterator* const iter_; + Iterator* iter_; SequenceNumber const sequence_; Status status_; @@ -211,18 +220,18 @@ void DBIter::FindNextUserEntryInternal(bool skipping) { case kTypeDeletion: // Arrange to skip all upcoming entries for this key since // they are hidden by this deletion. - saved_key_.SetUserKey(ikey.user_key); + saved_key_.SetKey(ikey.user_key); skipping = true; num_skipped = 0; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); break; case kTypeValue: valid_ = true; - saved_key_.SetUserKey(ikey.user_key); + saved_key_.SetKey(ikey.user_key); return; case kTypeMerge: // By now, we are sure the current ikey is going to yield a value - saved_key_.SetUserKey(ikey.user_key); + saved_key_.SetKey(ikey.user_key); current_entry_is_merged_ = true; valid_ = true; MergeValuesNewToOld(); // Go to a different state machine @@ -331,7 +340,7 @@ void DBIter::Prev() { // iter_ is pointing at the current entry. Scan backwards until // the key changes so we can use the normal reverse scanning code. assert(iter_->Valid()); // Otherwise valid_ would have been false - saved_key_.SetUserKey(ExtractUserKey(iter_->key())); + saved_key_.SetKey(ExtractUserKey(iter_->key())); while (true) { iter_->Prev(); if (!iter_->Valid()) { @@ -377,7 +386,7 @@ void DBIter::FindPrevUserEntry() { std::string empty; swap(empty, saved_value_); } - saved_key_.SetUserKey(ExtractUserKey(iter_->key())); + saved_key_.SetKey(ExtractUserKey(iter_->key())); saved_value_.assign(raw_value.data(), raw_value.size()); } } else { @@ -461,16 +470,48 @@ void DBIter::SeekToLast() { FindPrevUserEntry(); } -} // anonymous namespace +Iterator* NewDBIterator(Env* env, const Options& options, + const Comparator* user_key_comparator, + Iterator* internal_iter, + const SequenceNumber& sequence) { + return new DBIter(env, options, user_key_comparator, internal_iter, sequence, + false); +} + +ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); } + +void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; } + +void ArenaWrappedDBIter::SetIterUnderDBIter(Iterator* iter) { + static_cast(db_iter_)->SetIter(iter); +} + +inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); } +inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); } +inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); } +inline void ArenaWrappedDBIter::Seek(const Slice& target) { + db_iter_->Seek(target); +} +inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); } +inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); } +inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); } +inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); } +inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); } +void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1, + void* arg2) { + db_iter_->RegisterCleanup(function, arg1, arg2); +} -Iterator* NewDBIterator( - Env* env, - const Options& options, - const Comparator *user_key_comparator, - Iterator* internal_iter, +ArenaWrappedDBIter* NewArenaWrappedDbIterator( + Env* env, const Options& options, const Comparator* user_key_comparator, const SequenceNumber& sequence) { - return new DBIter(env, options, user_key_comparator, - internal_iter, sequence); + ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); + Arena* arena = iter->GetArena(); + auto mem = arena->AllocateAligned(sizeof(DBIter)); + DBIter* db_iter = new (mem) + DBIter(env, options, user_key_comparator, nullptr, sequence, true); + iter->SetDBIter(db_iter); + return iter; } } // namespace rocksdb diff --git a/db/db_iter.h b/db/db_iter.h index d8a3bad51..cb9840324 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -11,9 +11,14 @@ #include #include "rocksdb/db.h" #include "db/dbformat.h" +#include "util/arena.h" +#include "util/autovector.h" namespace rocksdb { +class Arena; +class DBIter; + // Return a new iterator that converts internal keys (yielded by // "*internal_iter") that were live at the specified "sequence" number // into appropriate user keys. @@ -24,4 +29,45 @@ extern Iterator* NewDBIterator( Iterator* internal_iter, const SequenceNumber& sequence); +// A wrapper iterator which wraps DB Iterator and the arena, with which the DB +// iterator is supposed be allocated. This class is used as an entry point of +// a iterator hierarchy whose memory can be allocated inline. In that way, +// accessing the iterator tree can be more cache friendly. It is also faster +// to allocate. +class ArenaWrappedDBIter : public Iterator { + public: + virtual ~ArenaWrappedDBIter(); + + // Get the arena to be used to allocate memory for DBIter to be wrapped, + // as well as child iterators in it. + virtual Arena* GetArena() { return &arena_; } + + // Set the DB Iterator to be wrapped + + virtual void SetDBIter(DBIter* iter); + + // Set the internal iterator wrapped inside the DB Iterator. Usually it is + // a merging iterator. + virtual void SetIterUnderDBIter(Iterator* iter); + virtual bool Valid() const override; + virtual void SeekToFirst() override; + virtual void SeekToLast() override; + virtual void Seek(const Slice& target) override; + virtual void Next() override; + virtual void Prev() override; + virtual Slice key() const override; + virtual Slice value() const override; + virtual Status status() const override; + void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2); + + private: + DBIter* db_iter_; + Arena arena_; +}; + +// Generate the arena wrapped iterator class. +extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( + Env* env, const Options& options, const Comparator* user_key_comparator, + const SequenceNumber& sequence); + } // namespace rocksdb diff --git a/db/dbformat.h b/db/dbformat.h index 4356dd934..9640372d7 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -256,10 +256,10 @@ class IterKey { void Clear() { key_size_ = 0; } - void SetUserKey(const Slice& user_key) { - size_t size = user_key.size(); + void SetKey(const Slice& key) { + size_t size = key.size(); EnlargeBufferIfNeeded(size); - memcpy(key_, user_key.data(), size); + memcpy(key_, key.data(), size); key_size_ = size; } diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc new file mode 100644 index 000000000..35a31ddc5 --- /dev/null +++ b/db/forward_iterator.cc @@ -0,0 +1,383 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE +#include "db/forward_iterator.h" + +#include +#include +#include +#include "db/db_impl.h" +#include "db/db_iter.h" +#include "db/column_family.h" +#include "rocksdb/env.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" +#include "table/merger.h" +#include "db/dbformat.h" + +namespace rocksdb { + +// Usage: +// LevelIterator iter; +// iter.SetFileIndex(file_index); +// iter.Seek(target); +// iter.Next() +class LevelIterator : public Iterator { + public: + LevelIterator(const ColumnFamilyData* const cfd, + const ReadOptions& read_options, + const std::vector& files) + : cfd_(cfd), read_options_(read_options), files_(files), valid_(false), + file_index_(std::numeric_limits::max()) {} + + void SetFileIndex(uint32_t file_index) { + assert(file_index < files_.size()); + if (file_index != file_index_) { + file_index_ = file_index; + file_iter_.reset(cfd_->table_cache()->NewIterator( + read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), + *(files_[file_index_]), nullptr /* table_reader_ptr */, false)); + } + valid_ = false; + } + void SeekToLast() override { + status_ = Status::NotSupported("LevelIterator::SeekToLast()"); + valid_ = false; + } + void Prev() { + status_ = Status::NotSupported("LevelIterator::Prev()"); + valid_ = false; + } + bool Valid() const override { + return valid_; + } + void SeekToFirst() override { + SetFileIndex(0); + file_iter_->SeekToFirst(); + valid_ = file_iter_->Valid(); + } + void Seek(const Slice& internal_key) override { + assert(file_iter_ != nullptr); + file_iter_->Seek(internal_key); + valid_ = file_iter_->Valid(); + assert(valid_); + } + void Next() override { + assert(valid_); + file_iter_->Next(); + while (!file_iter_->Valid()) { + if (file_index_ + 1 >= files_.size()) { + valid_ = false; + return; + } + SetFileIndex(file_index_ + 1); + file_iter_->SeekToFirst(); + } + valid_ = file_iter_->Valid(); + } + Slice key() const override { + assert(valid_); + return file_iter_->key(); + } + Slice value() const override { + assert(valid_); + return file_iter_->value(); + } + Status status() const override { + return status_; + } + + private: + const ColumnFamilyData* const cfd_; + const ReadOptions& read_options_; + const std::vector& files_; + + bool valid_; + uint32_t file_index_; + Status status_; + std::unique_ptr file_iter_; +}; + +ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options, + ColumnFamilyData* cfd) + : db_(db), + read_options_(read_options), + cfd_(cfd), + prefix_extractor_(cfd->options()->prefix_extractor.get()), + user_comparator_(cfd->user_comparator()), + immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())), + sv_(nullptr), + mutable_iter_(nullptr), + current_(nullptr), + valid_(false), + is_prev_set_(false) {} + +ForwardIterator::~ForwardIterator() { + Cleanup(); +} + +void ForwardIterator::Cleanup() { + delete mutable_iter_; + for (auto* m : imm_iters_) { + delete m; + } + imm_iters_.clear(); + for (auto* f : l0_iters_) { + delete f; + } + l0_iters_.clear(); + for (auto* l : level_iters_) { + delete l; + } + level_iters_.clear(); + + if (sv_ != nullptr && sv_->Unref()) { + DBImpl::DeletionState deletion_state; + db_->mutex_.Lock(); + sv_->Cleanup(); + db_->FindObsoleteFiles(deletion_state, false, true); + db_->mutex_.Unlock(); + delete sv_; + if (deletion_state.HaveSomethingToDelete()) { + db_->PurgeObsoleteFiles(deletion_state); + } + } +} + +bool ForwardIterator::Valid() const { + return valid_; +} + +void ForwardIterator::SeekToFirst() { + if (sv_ == nullptr || + sv_ ->version_number != cfd_->GetSuperVersionNumber()) { + RebuildIterators(); + } + SeekInternal(Slice(), true); +} + +void ForwardIterator::Seek(const Slice& internal_key) { + if (sv_ == nullptr || + sv_ ->version_number != cfd_->GetSuperVersionNumber()) { + RebuildIterators(); + } + SeekInternal(internal_key, false); +} + +void ForwardIterator::SeekInternal(const Slice& internal_key, + bool seek_to_first) { + // mutable + seek_to_first ? mutable_iter_->SeekToFirst() : + mutable_iter_->Seek(internal_key); + + // immutable + // TODO(ljin): NeedToSeekImmutable has negative impact on performance + // if it turns to need to seek immutable often. We probably want to have + // an option to turn it off. + if (seek_to_first || NeedToSeekImmutable(internal_key)) { + { + auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator())); + immutable_min_heap_.swap(tmp); + } + for (auto* m : imm_iters_) { + seek_to_first ? m->SeekToFirst() : m->Seek(internal_key); + if (m->Valid()) { + immutable_min_heap_.push(m); + } + } + + auto* files = sv_->current->files_; + for (uint32_t i = 0; i < files[0].size(); ++i) { + if (seek_to_first) { + l0_iters_[i]->SeekToFirst(); + } else { + // If the target key passes over the larget key, we are sure Next() + // won't go over this file. + if (user_comparator_->Compare(ExtractUserKey(internal_key), + files[0][i]->largest.user_key()) > 0) { + continue; + } + l0_iters_[i]->Seek(internal_key); + } + if (l0_iters_[i]->Valid()) { + immutable_min_heap_.push(l0_iters_[i]); + } + } + for (int32_t level = 1; level < sv_->current->NumberLevels(); ++level) { + if (files[level].empty()) { + continue; + } + assert(level_iters_[level - 1] != nullptr); + uint32_t f_idx = 0; + if (!seek_to_first) { + f_idx = FindFileInRange( + files[level], internal_key, 0, files[level].size()); + } + if (f_idx < files[level].size()) { + level_iters_[level - 1]->SetFileIndex(f_idx); + seek_to_first ? level_iters_[level - 1]->SeekToFirst() : + level_iters_[level - 1]->Seek(internal_key); + if (level_iters_[level - 1]->Valid()) { + immutable_min_heap_.push(level_iters_[level - 1]); + } + } + } + + if (seek_to_first || immutable_min_heap_.empty()) { + is_prev_set_ = false; + } else { + prev_key_.SetKey(internal_key); + is_prev_set_ = true; + } + } + + UpdateCurrent(); +} + +void ForwardIterator::Next() { + assert(valid_); + + if (sv_ == nullptr || + sv_ ->version_number != cfd_->GetSuperVersionNumber()) { + std::string current_key = key().ToString(); + Slice old_key(current_key.data(), current_key.size()); + + RebuildIterators(); + SeekInternal(old_key, false); + if (!valid_ || key().compare(old_key) != 0) { + return; + } + } else if (current_ != mutable_iter_) { + // It is going to advance immutable iterator + prev_key_.SetKey(current_->key()); + is_prev_set_ = true; + } + + current_->Next(); + if (current_->Valid() && current_ != mutable_iter_) { + immutable_min_heap_.push(current_); + } + UpdateCurrent(); +} + +Slice ForwardIterator::key() const { + assert(valid_); + return current_->key(); +} + +Slice ForwardIterator::value() const { + assert(valid_); + return current_->value(); +} + +Status ForwardIterator::status() const { + if (!status_.ok()) { + return status_; + } else if (!mutable_iter_->status().ok()) { + return mutable_iter_->status(); + } + return Status::OK(); +} + +void ForwardIterator::RebuildIterators() { + // Clean up + Cleanup(); + // New + sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); + mutable_iter_ = sv_->mem->NewIterator(read_options_); + sv_->imm->AddIterators(read_options_, &imm_iters_); + const auto& l0_files = sv_->current->files_[0]; + l0_iters_.reserve(l0_files.size()); + for (const auto* l0 : l0_files) { + l0_iters_.push_back(cfd_->table_cache()->NewIterator( + read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0)); + } + level_iters_.reserve(sv_->current->NumberLevels() - 1); + for (int32_t level = 1; level < sv_->current->NumberLevels(); ++level) { + if (sv_->current->files_[level].empty()) { + level_iters_.push_back(nullptr); + } else { + level_iters_.push_back(new LevelIterator(cfd_, read_options_, + sv_->current->files_[level])); + } + } + + current_ = nullptr; + is_prev_set_ = false; +} + +void ForwardIterator::UpdateCurrent() { + if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) { + current_ = nullptr; + } else if (immutable_min_heap_.empty()) { + current_ = mutable_iter_; + } else if (!mutable_iter_->Valid()) { + current_ = immutable_min_heap_.top(); + immutable_min_heap_.pop(); + } else { + current_ = immutable_min_heap_.top(); + assert(current_ != nullptr); + assert(current_->Valid()); + int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare( + mutable_iter_->key(), current_->key()) > 0; + assert(cmp != 0); + if (cmp > 0) { + immutable_min_heap_.pop(); + } else { + current_ = mutable_iter_; + } + } + valid_ = (current_ != nullptr); + if (!status_.ok()) { + status_ = Status::OK(); + } +} + +bool ForwardIterator::NeedToSeekImmutable(const Slice& target) { + if (!is_prev_set_) { + return true; + } + Slice prev_key = prev_key_.GetKey(); + if (prefix_extractor_ && prefix_extractor_->Transform(target).compare( + prefix_extractor_->Transform(prev_key)) != 0) { + return true; + } + if (cfd_->internal_comparator().InternalKeyComparator::Compare( + prev_key, target) >= 0) { + return true; + } + if (immutable_min_heap_.empty() || + cfd_->internal_comparator().InternalKeyComparator::Compare( + target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key() + : current_->key()) > 0) { + return true; + } + return false; +} + +uint32_t ForwardIterator::FindFileInRange( + const std::vector& files, const Slice& internal_key, + uint32_t left, uint32_t right) { + while (left < right) { + uint32_t mid = (left + right) / 2; + const FileMetaData* f = files[mid]; + if (cfd_->internal_comparator().InternalKeyComparator::Compare( + f->largest.Encode(), internal_key) < 0) { + // Key at "mid.largest" is < "target". Therefore all + // files at or before "mid" are uninteresting. + left = mid + 1; + } else { + // Key at "mid.largest" is >= "target". Therefore all files + // after "mid" are uninteresting. + right = mid; + } + } + return right; +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/db/forward_iterator.h b/db/forward_iterator.h new file mode 100644 index 000000000..d539ae3c7 --- /dev/null +++ b/db/forward_iterator.h @@ -0,0 +1,105 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "db/dbformat.h" + +namespace rocksdb { + +class DBImpl; +class Env; +struct SuperVersion; +class ColumnFamilyData; +class LevelIterator; +struct FileMetaData; + +class MinIterComparator { + public: + explicit MinIterComparator(const Comparator* comparator) : + comparator_(comparator) {} + + bool operator()(Iterator* a, Iterator* b) { + return comparator_->Compare(a->key(), b->key()) > 0; + } + private: + const Comparator* comparator_; +}; + +typedef std::priority_queue, + MinIterComparator> MinIterHeap; + +/** + * ForwardIterator is a special type of iterator that only supports Seek() + * and Next(). It is expected to perform better than TailingIterator by + * removing the encapsulation and making all information accessible within + * the iterator. At the current implementation, snapshot is taken at the + * time Seek() is called. The Next() followed do not see new values after. + */ +class ForwardIterator : public Iterator { + public: + ForwardIterator(DBImpl* db, const ReadOptions& read_options, + ColumnFamilyData* cfd); + virtual ~ForwardIterator(); + + void SeekToLast() override { + status_ = Status::NotSupported("ForwardIterator::SeekToLast()"); + valid_ = false; + } + void Prev() { + status_ = Status::NotSupported("ForwardIterator::Prev"); + valid_ = false; + } + + virtual bool Valid() const override; + void SeekToFirst() override; + virtual void Seek(const Slice& target) override; + virtual void Next() override; + virtual Slice key() const override; + virtual Slice value() const override; + virtual Status status() const override; + + private: + void Cleanup(); + void RebuildIterators(); + void SeekInternal(const Slice& internal_key, bool seek_to_first); + void UpdateCurrent(); + bool NeedToSeekImmutable(const Slice& internal_key); + uint32_t FindFileInRange( + const std::vector& files, const Slice& internal_key, + uint32_t left, uint32_t right); + + DBImpl* const db_; + const ReadOptions read_options_; + ColumnFamilyData* const cfd_; + const SliceTransform* const prefix_extractor_; + const Comparator* user_comparator_; + MinIterHeap immutable_min_heap_; + + SuperVersion* sv_; + Iterator* mutable_iter_; + std::vector imm_iters_; + std::vector l0_iters_; + std::vector level_iters_; + Iterator* current_; + // internal iterator status + Status status_; + bool valid_; + + IterKey prev_key_; + bool is_prev_set_; +}; + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/db/memtable.cc b/db/memtable.cc index 45f58b979..c6b915b99 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -20,6 +20,7 @@ #include "rocksdb/iterator.h" #include "rocksdb/merge_operator.h" #include "rocksdb/slice_transform.h" +#include "table/merger.h" #include "util/arena.h" #include "util/coding.h" #include "util/murmurhash.h" @@ -173,15 +174,24 @@ const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator: public Iterator { public: MemTableIterator(const MemTable& mem, const ReadOptions& options, - bool enforce_total_order) + bool enforce_total_order, Arena* arena) : bloom_(nullptr), prefix_extractor_(mem.prefix_extractor_), - valid_(false) { + valid_(false), + arena_mode_(arena != nullptr) { if (prefix_extractor_ != nullptr && !enforce_total_order) { bloom_ = mem.prefix_bloom_.get(); - iter_.reset(mem.table_->GetDynamicPrefixIterator()); + iter_ = mem.table_->GetDynamicPrefixIterator(arena); } else { - iter_.reset(mem.table_->GetIterator()); + iter_ = mem.table_->GetIterator(arena); + } + } + + ~MemTableIterator() { + if (arena_mode_) { + iter_->~Iterator(); + } else { + delete iter_; } } @@ -228,8 +238,9 @@ class MemTableIterator: public Iterator { private: DynamicBloom* bloom_; const SliceTransform* const prefix_extractor_; - std::unique_ptr iter_; + MemTableRep::Iterator* iter_; bool valid_; + bool arena_mode_; // No copying allowed MemTableIterator(const MemTableIterator&); @@ -237,8 +248,14 @@ class MemTableIterator: public Iterator { }; Iterator* MemTable::NewIterator(const ReadOptions& options, - bool enforce_total_order) { - return new MemTableIterator(*this, options, enforce_total_order); + bool enforce_total_order, Arena* arena) { + if (arena == nullptr) { + return new MemTableIterator(*this, options, enforce_total_order, nullptr); + } else { + auto mem = arena->AllocateAligned(sizeof(MemTableIterator)); + return new (mem) + MemTableIterator(*this, options, enforce_total_order, arena); + } } port::RWMutex* MemTable::GetLock(const Slice& key) { diff --git a/db/memtable.h b/db/memtable.h index 7e9af3504..8bad2773a 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -21,6 +21,7 @@ namespace rocksdb { +class Arena; class Mutex; class MemTableIterator; class MergeContext; @@ -77,8 +78,12 @@ class MemTable { // // By default, it returns an iterator for prefix seek if prefix_extractor // is configured in Options. + // arena: If not null, the arena needs to be used to allocate the Iterator. + // Calling ~Iterator of the iterator will destroy all the states but + // those allocated in arena. Iterator* NewIterator(const ReadOptions& options, - bool enforce_total_order = false); + bool enforce_total_order = false, + Arena* arena = nullptr); // Add an entry into memtable that maps key to value at the // specified sequence number and with the specified type. diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 235421962..de1a18eee 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -11,6 +11,7 @@ #include "db/version_set.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" +#include "table/merger.h" #include "util/coding.h" #include "util/log_buffer.h" @@ -78,6 +79,14 @@ void MemTableListVersion::AddIterators(const ReadOptions& options, } } +void MemTableListVersion::AddIterators( + const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) { + for (auto& m : memlist_) { + merge_iter_builder->AddIterator( + m->NewIterator(options, merge_iter_builder->GetArena())); + } +} + uint64_t MemTableListVersion::GetTotalNumEntries() const { uint64_t total_num = 0; for (auto& m : memlist_) { diff --git a/db/memtable_list.h b/db/memtable_list.h index d85380b55..e56710fc9 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -28,6 +28,7 @@ namespace rocksdb { class ColumnFamilyData; class InternalKeyComparator; class Mutex; +class MergeIteratorBuilder; // keeps a list of immutable memtables in a vector. the list is immutable // if refcount is bigger than one. It is used as a state for Get() and @@ -49,6 +50,9 @@ class MemTableListVersion { void AddIterators(const ReadOptions& options, std::vector* iterator_list); + void AddIterators(const ReadOptions& options, + MergeIteratorBuilder* merge_iter_builder); + uint64_t GetTotalNumEntries() const; private: diff --git a/db/simple_table_db_test.cc b/db/simple_table_db_test.cc index affa61465..a86ff0a17 100644 --- a/db/simple_table_db_test.cc +++ b/db/simple_table_db_test.cc @@ -83,7 +83,7 @@ public: unique_ptr && file, uint64_t file_size, unique_ptr* table_reader); - Iterator* NewIterator(const ReadOptions&) override; + Iterator* NewIterator(const ReadOptions&, Arena* arena) override; Status Get(const ReadOptions&, const Slice& key, void* arg, bool (*handle_result)(void* arg, const ParsedInternalKey& k, @@ -218,8 +218,14 @@ std::shared_ptr SimpleTableReader::GetTableProperties() return rep_->table_properties; } -Iterator* SimpleTableReader::NewIterator(const ReadOptions& options) { - return new SimpleTableIterator(this); +Iterator* SimpleTableReader::NewIterator(const ReadOptions& options, + Arena* arena) { + if (arena == nullptr) { + return new SimpleTableIterator(this); + } else { + auto mem = arena->AllocateAligned(sizeof(SimpleTableIterator)); + return new (mem) SimpleTableIterator(this); + } } Status SimpleTableReader::GetOffset(const Slice& target, uint64_t* offset) { diff --git a/db/table_cache.cc b/db/table_cache.cc index 2321d035a..f4757cbfe 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -13,6 +13,7 @@ #include "db/version_edit.h" #include "rocksdb/statistics.h" +#include "table/iterator_wrapper.h" #include "table/table_reader.h" #include "util/coding.h" #include "util/stop_watch.h" @@ -102,7 +103,7 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, const InternalKeyComparator& icomparator, const FileMetaData& file_meta, TableReader** table_reader_ptr, - bool for_compaction) { + bool for_compaction, Arena* arena) { if (table_reader_ptr != nullptr) { *table_reader_ptr = nullptr; } @@ -113,12 +114,12 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, s = FindTable(toptions, icomparator, file_meta.number, file_meta.file_size, &handle, nullptr, options.read_tier == kBlockCacheTier); if (!s.ok()) { - return NewErrorIterator(s); + return NewErrorIterator(s, arena); } table_reader = GetTableReaderFromHandle(handle); } - Iterator* result = table_reader->NewIterator(options); + Iterator* result = table_reader->NewIterator(options, arena); if (handle != nullptr) { result->RegisterCleanup(&UnrefEntry, cache_, handle); } diff --git a/db/table_cache.h b/db/table_cache.h index e8cd7ea2e..1aa61db01 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -23,6 +23,7 @@ namespace rocksdb { class Env; +class Arena; struct FileMetaData; // TODO(sdong): try to come up with a better API to pass the file information @@ -44,7 +45,7 @@ class TableCache { const InternalKeyComparator& internal_comparator, const FileMetaData& file_meta, TableReader** table_reader_ptr = nullptr, - bool for_compaction = false); + bool for_compaction = false, Arena* arena = nullptr); // If a seek to internal key "k" in specified file finds an entry, // call (*handle_result)(arg, found_key, found_value) repeatedly until diff --git a/db/version_set.cc b/db/version_set.cc index 5327cf55f..c6a9e6ab1 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -332,6 +332,32 @@ void Version::AddIterators(const ReadOptions& read_options, } } +void Version::AddIterators(const ReadOptions& read_options, + const EnvOptions& soptions, + MergeIteratorBuilder* merge_iter_builder) { + // Merge all level zero files together since they may overlap + for (const FileMetaData* file : files_[0]) { + merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( + read_options, soptions, cfd_->internal_comparator(), *file, nullptr, + false, merge_iter_builder->GetArena())); + } + + // For levels > 0, we can use a concatenating iterator that sequentially + // walks through the non-overlapping files in the level, opening them + // lazily. + for (int level = 1; level < num_levels_; level++) { + if (!files_[level].empty()) { + merge_iter_builder->AddIterator(NewTwoLevelIterator( + new LevelFileIteratorState( + cfd_->table_cache(), read_options, soptions, + cfd_->internal_comparator(), false /* for_compaction */, + cfd_->options()->prefix_extractor != nullptr), + new LevelFileNumIterator(cfd_->internal_comparator(), &files_[level]), + merge_iter_builder->GetArena())); + } + } +} + // Callback from TableCache::Get() namespace { enum SaverState { diff --git a/db/version_set.h b/db/version_set.h index ffadb5813..7c8d7146e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -51,6 +51,7 @@ class MergeContext; class ColumnFamilyData; class ColumnFamilySet; class TableCache; +class MergeIteratorBuilder; // Return the smallest index i such that files[i]->largest >= key. // Return files.size() if there is no such file. @@ -80,6 +81,9 @@ class Version { void AddIterators(const ReadOptions&, const EnvOptions& soptions, std::vector* iters); + void AddIterators(const ReadOptions&, const EnvOptions& soptions, + MergeIteratorBuilder* merger_iter_builder); + // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. Fills *stats. // Uses *operands to store merge_operator operations to apply later @@ -218,6 +222,7 @@ class Version { friend class LevelCompactionPicker; friend class UniversalCompactionPicker; friend class FIFOCompactionPicker; + friend class ForwardIterator; class LevelFileNumIterator; class LevelFileIteratorState; diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index ac376d747..6134fd166 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -142,16 +142,28 @@ class MemTableRep { }; // Return an iterator over the keys in this representation. - virtual Iterator* GetIterator() = 0; + // arena: If not null, the arena needs to be used to allocate the Iterator. + // When destroying the iterator, the caller will not call "delete" + // but Iterator::~Iterator() directly. The destructor needs to destroy + // all the states but those allocated in arena. + virtual Iterator* GetIterator(Arena* arena = nullptr) = 0; // Return an iterator over at least the keys with the specified user key. The // iterator may also allow access to other keys, but doesn't have to. Default: // GetIterator(). - virtual Iterator* GetIterator(const Slice& user_key) { return GetIterator(); } + virtual Iterator* GetIterator(const Slice& user_key) { + return GetIterator(nullptr); + } // Return an iterator that has a special Seek semantics. The result of // a Seek might only include keys with the same prefix as the target key. - virtual Iterator* GetDynamicPrefixIterator() { return GetIterator(); } + // arena: If not null, the arena needs to be used to allocate the Iterator. + // When destroying the iterator, the caller will not call "delete" + // but Iterator::~Iterator() directly. The destructor needs to destroy + // all the states but those allocated in arena. + virtual Iterator* GetDynamicPrefixIterator(Arena* arena = nullptr) { + return GetIterator(arena); + } // Return true if the current MemTableRep supports merge operator. // Default: true diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 9ba6a522c..ded76a3ab 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -547,12 +547,9 @@ struct ColumnFamilyOptions { // Control locality of bloom filter probes to improve cache miss rate. // This option only applies to memtable prefix bloom and plaintable - // prefix bloom. It essentially limits the max number of cache lines each - // bloom filter check can touch. - // This optimization is turned off when set to 0. The number should never - // be greater than number of probes. This option can boost performance - // for in-memory workload but should use with care since it can cause - // higher false positive rate. + // prefix bloom. It essentially limits every bloom checking to one cache line. + // This optimization is turned off when set to 0, and positive number to turn + // it on. // Default: 0 uint32_t bloom_locality; diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 4fcdfd29e..71fff659a 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -980,10 +980,11 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) { return may_match; } -Iterator* BlockBasedTable::NewIterator(const ReadOptions& read_options) { - return NewTwoLevelIterator(new BlockEntryIteratorState(this, read_options, - nullptr), - NewIndexIterator(read_options)); +Iterator* BlockBasedTable::NewIterator(const ReadOptions& read_options, + Arena* arena) { + return NewTwoLevelIterator( + new BlockEntryIteratorState(this, read_options, nullptr), + NewIndexIterator(read_options), arena); } Status BlockBasedTable::Get( diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index cfb1d0af0..ba6a10c3e 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -68,7 +68,7 @@ class BlockBasedTable : public TableReader { // Returns a new iterator over the table contents. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). - Iterator* NewIterator(const ReadOptions&) override; + Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override; Status Get(const ReadOptions& readOptions, const Slice& key, void* handle_context, diff --git a/table/iterator.cc b/table/iterator.cc index a3d4f638d..4c360205a 100644 --- a/table/iterator.cc +++ b/table/iterator.cc @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "rocksdb/iterator.h" +#include "table/iterator_wrapper.h" +#include "util/arena.h" namespace rocksdb { @@ -65,8 +67,26 @@ Iterator* NewEmptyIterator() { return new EmptyIterator(Status::OK()); } +Iterator* NewEmptyIterator(Arena* arena) { + if (arena == nullptr) { + return NewEmptyIterator(); + } else { + auto mem = arena->AllocateAligned(sizeof(EmptyIterator)); + return new (mem) EmptyIterator(Status::OK()); + } +} + Iterator* NewErrorIterator(const Status& status) { return new EmptyIterator(status); } +Iterator* NewErrorIterator(const Status& status, Arena* arena) { + if (arena == nullptr) { + return NewErrorIterator(status); + } else { + auto mem = arena->AllocateAligned(sizeof(EmptyIterator)); + return new (mem) EmptyIterator(status); + } +} + } // namespace rocksdb diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index 6793bfa0a..502cacb3e 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -8,6 +8,9 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once + +#include "rocksdb/iterator.h" + namespace rocksdb { // A internal wrapper class with an interface similar to Iterator that @@ -20,14 +23,7 @@ class IteratorWrapper { explicit IteratorWrapper(Iterator* iter): iter_(nullptr) { Set(iter); } - IteratorWrapper(const IteratorWrapper&) { - // Iterator wrapper exclusively owns iter_ so it cannot be copied. - // Didn't delete the function because vector requires - // this function to compile. - assert(false); - } - void operator=(const IteratorWrapper&) = delete; - ~IteratorWrapper() { delete iter_; } + ~IteratorWrapper() {} Iterator* iter() const { return iter_; } // Takes ownership of "iter" and will delete it when destroyed, or @@ -42,6 +38,14 @@ class IteratorWrapper { } } + void DeleteIter(bool is_arena_mode) { + if (!is_arena_mode) { + delete iter_; + } else { + iter_->~Iterator(); + } + } + // Iterator interface methods bool Valid() const { return valid_; } Slice key() const { assert(Valid()); return key_; } @@ -67,4 +71,11 @@ class IteratorWrapper { Slice key_; }; +class Arena; +// Return an empty iterator (yields nothing) allocated from arena. +extern Iterator* NewEmptyIterator(Arena* arena); + +// Return an empty iterator with the specified status, allocated arena. +extern Iterator* NewErrorIterator(const Status& status, Arena* arena); + } // namespace rocksdb diff --git a/table/merger.cc b/table/merger.cc index 6c51f7fa1..9aab33ed3 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -17,13 +17,13 @@ #include "rocksdb/options.h" #include "table/iter_heap.h" #include "table/iterator_wrapper.h" +#include "util/arena.h" #include "util/stop_watch.h" #include "util/perf_context_imp.h" #include "util/autovector.h" namespace rocksdb { namespace { - typedef std::priority_queue< IteratorWrapper*, std::vector, @@ -43,13 +43,16 @@ MaxIterHeap NewMaxIterHeap(const Comparator* comparator) { MinIterHeap NewMinIterHeap(const Comparator* comparator) { return MinIterHeap(MinIteratorComparator(comparator)); } +} // namespace const size_t kNumIterReserve = 4; class MergingIterator : public Iterator { public: - MergingIterator(const Comparator* comparator, Iterator** children, int n) - : comparator_(comparator), + MergingIterator(const Comparator* comparator, Iterator** children, int n, + bool is_arena_mode) + : is_arena_mode_(is_arena_mode), + comparator_(comparator), current_(nullptr), use_heap_(true), direction_(kForward), @@ -66,7 +69,20 @@ class MergingIterator : public Iterator { } } - virtual ~MergingIterator() { } + virtual void AddIterator(Iterator* iter) { + assert(direction_ == kForward); + children_.emplace_back(iter); + auto new_wrapper = children_.back(); + if (new_wrapper.Valid()) { + minHeap_.push(&new_wrapper); + } + } + + virtual ~MergingIterator() { + for (auto& child : children_) { + child.DeleteIter(is_arena_mode_); + } + } virtual bool Valid() const { return (current_ != nullptr); @@ -242,6 +258,7 @@ class MergingIterator : public Iterator { void FindLargest(); void ClearHeaps(); + bool is_arena_mode_; const Comparator* comparator_; autovector children_; IteratorWrapper* current_; @@ -288,16 +305,51 @@ void MergingIterator::ClearHeaps() { maxHeap_ = NewMaxIterHeap(comparator_); minHeap_ = NewMinIterHeap(comparator_); } -} // namespace -Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n) { +Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n, + Arena* arena) { assert(n >= 0); if (n == 0) { - return NewEmptyIterator(); + return NewEmptyIterator(arena); } else if (n == 1) { return list[0]; } else { - return new MergingIterator(cmp, list, n); + if (arena == nullptr) { + return new MergingIterator(cmp, list, n, false); + } else { + auto mem = arena->AllocateAligned(sizeof(MergingIterator)); + return new (mem) MergingIterator(cmp, list, n, true); + } + } +} + +MergeIteratorBuilder::MergeIteratorBuilder(const Comparator* comparator, + Arena* a) + : first_iter(nullptr), use_merging_iter(false), arena(a) { + + auto mem = arena->AllocateAligned(sizeof(MergingIterator)); + merge_iter = new (mem) MergingIterator(comparator, nullptr, 0, true); +} + +void MergeIteratorBuilder::AddIterator(Iterator* iter) { + if (!use_merging_iter && first_iter != nullptr) { + merge_iter->AddIterator(first_iter); + use_merging_iter = true; + } + if (use_merging_iter) { + merge_iter->AddIterator(iter); + } else { + first_iter = iter; + } +} + +Iterator* MergeIteratorBuilder::Finish() { + if (!use_merging_iter) { + return first_iter; + } else { + auto ret = merge_iter; + merge_iter = nullptr; + return ret; } } diff --git a/table/merger.h b/table/merger.h index 3a1a4feb8..7dcf2afe7 100644 --- a/table/merger.h +++ b/table/merger.h @@ -9,11 +9,14 @@ #pragma once +#include "rocksdb/types.h" + namespace rocksdb { class Comparator; class Iterator; class Env; +class Arena; // Return an iterator that provided the union of the data in // children[0,n-1]. Takes ownership of the child iterators and @@ -24,6 +27,34 @@ class Env; // // REQUIRES: n >= 0 extern Iterator* NewMergingIterator(const Comparator* comparator, - Iterator** children, int n); + Iterator** children, int n, + Arena* arena = nullptr); + +class MergingIterator; + +// A builder class to build a merging iterator by adding iterators one by one. +class MergeIteratorBuilder { + public: + // comparator: the comparator used in merging comparator + // arena: where the merging iterator needs to be allocated from. + explicit MergeIteratorBuilder(const Comparator* comparator, Arena* arena); + ~MergeIteratorBuilder() {} + + // Add iter to the merging iterator. + void AddIterator(Iterator* iter); + + // Get arena used to build the merging iterator. It is called one a child + // iterator needs to be allocated. + Arena* GetArena() { return arena; } + + // Return the result merging iterator. + Iterator* Finish(); + + private: + MergingIterator* merge_iter; + Iterator* first_iter; + bool use_merging_iter; + Arena* arena; +}; } // namespace rocksdb diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 43daaa9a9..22968ef6b 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -156,8 +156,15 @@ Status PlainTableReader::Open(const Options& options, void PlainTableReader::SetupForCompaction() { } -Iterator* PlainTableReader::NewIterator(const ReadOptions& options) { - return new PlainTableIterator(this, options_.prefix_extractor != nullptr); +Iterator* PlainTableReader::NewIterator(const ReadOptions& options, + Arena* arena) { + if (arena == nullptr) { + return new PlainTableIterator(this, options_.prefix_extractor != nullptr); + } else { + auto mem = arena->AllocateAligned(sizeof(PlainTableIterator)); + return new (mem) + PlainTableIterator(this, options_.prefix_extractor != nullptr); + } } struct PlainTableReader::IndexRecord { diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h index e6373dc82..62239beb3 100644 --- a/table/plain_table_reader.h +++ b/table/plain_table_reader.h @@ -55,7 +55,7 @@ class PlainTableReader: public TableReader { const int bloom_bits_per_key, double hash_table_ratio, size_t index_sparseness, size_t huge_page_tlb_size); - Iterator* NewIterator(const ReadOptions&); + Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override; Status Get(const ReadOptions&, const Slice& key, void* arg, bool (*result_handler)(void* arg, const ParsedInternalKey& k, diff --git a/table/table_reader.h b/table/table_reader.h index 02a2d16dc..9238b880c 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -15,6 +15,7 @@ namespace rocksdb { class Iterator; struct ParsedInternalKey; class Slice; +class Arena; struct ReadOptions; struct TableProperties; @@ -28,7 +29,11 @@ class TableReader { // Returns a new iterator over the table contents. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). - virtual Iterator* NewIterator(const ReadOptions&) = 0; + // arena: If not null, the arena needs to be used to allocate the Iterator. + // When destroying the iterator, the caller will not call "delete" + // but Iterator::~Iterator() directly. The destructor needs to destroy + // all the states but those allocated in arena. + virtual Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) = 0; // Given a key, return an approximate byte offset in the file where // the data for that key begins (or would begin if the key were diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index 990f18184..6af48f58c 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -13,6 +13,7 @@ #include "rocksdb/table.h" #include "table/block.h" #include "table/format.h" +#include "util/arena.h" namespace rocksdb { @@ -23,7 +24,10 @@ class TwoLevelIterator: public Iterator { explicit TwoLevelIterator(TwoLevelIteratorState* state, Iterator* first_level_iter); - virtual ~TwoLevelIterator() {} + virtual ~TwoLevelIterator() { + first_level_iter_.DeleteIter(false); + second_level_iter_.DeleteIter(false); + } virtual void Seek(const Slice& target); virtual void SeekToFirst(); @@ -183,8 +187,13 @@ void TwoLevelIterator::InitDataBlock() { } // namespace Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state, - Iterator* first_level_iter) { - return new TwoLevelIterator(state, first_level_iter); + Iterator* first_level_iter, Arena* arena) { + if (arena == nullptr) { + return new TwoLevelIterator(state, first_level_iter); + } else { + auto mem = arena->AllocateAligned(sizeof(TwoLevelIterator)); + return new (mem) TwoLevelIterator(state, first_level_iter); + } } } // namespace rocksdb diff --git a/table/two_level_iterator.h b/table/two_level_iterator.h index b8083385b..d955dd763 100644 --- a/table/two_level_iterator.h +++ b/table/two_level_iterator.h @@ -16,6 +16,7 @@ namespace rocksdb { struct ReadOptions; class InternalKeyComparator; +class Arena; struct TwoLevelIteratorState { explicit TwoLevelIteratorState(bool check_prefix_may_match) @@ -39,7 +40,11 @@ struct TwoLevelIteratorState { // // Uses a supplied function to convert an index_iter value into // an iterator over the contents of the corresponding block. +// arena: If not null, the arena is used to allocate the Iterator. +// When destroying the iterator, the destructor will destroy +// all the states but those allocated in arena. extern Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state, - Iterator* first_level_iter); + Iterator* first_level_iter, + Arena* arena = nullptr); } // namespace rocksdb diff --git a/tools/db_stress.cc b/tools/db_stress.cc index e9d759f32..7e3101c07 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -162,6 +162,14 @@ DEFINE_int32(max_background_compactions, "The maximum number of concurrent background compactions " "that can occur in parallel."); +DEFINE_int32(compaction_thread_pool_adjust_interval, 0, + "The interval (in milliseconds) to adjust compaction thread pool " + "size. Don't change it periodically if the value is 0."); + +DEFINE_int32(compaction_thread_pool_varations, 2, + "Range of bakground thread pool size variations when adjusted " + "periodically."); + DEFINE_int32(max_background_flushes, rocksdb::Options().max_background_flushes, "The maximum number of concurrent background flushes " "that can occur in parallel."); @@ -567,6 +575,8 @@ class SharedState { num_done_(0), start_(false), start_verify_(false), + should_stop_bg_thread_(false), + bg_thread_finished_(false), stress_test_(stress_test), verification_failure_(false) { if (FLAGS_test_batches_snapshots) { @@ -694,6 +704,14 @@ class SharedState { uint32_t GetSeed() const { return seed_; } + void SetShouldStopBgThread() { should_stop_bg_thread_ = true; } + + bool ShoudStopBgThread() { return should_stop_bg_thread_; } + + void SetBgThreadFinish() { bg_thread_finished_ = true; } + + bool BgThreadFinished() const { return bg_thread_finished_; } + private: port::Mutex mu_; port::CondVar cv_; @@ -707,6 +725,8 @@ class SharedState { long num_done_; bool start_; bool start_verify_; + bool should_stop_bg_thread_; + bool bg_thread_finished_; StressTest* stress_test_; std::atomic verification_failure_; @@ -777,6 +797,11 @@ class StressTest { threads[i] = new ThreadState(i, &shared); FLAGS_env->StartThread(ThreadBody, threads[i]); } + ThreadState bg_thread(0, &shared); + if (FLAGS_compaction_thread_pool_adjust_interval > 0) { + FLAGS_env->StartThread(PoolSizeChangeThread, &bg_thread); + } + // Each thread goes through the following states: // initializing -> wait for others to init -> read/populate/depopulate // wait for others to operate -> verify -> done @@ -829,6 +854,14 @@ class StressTest { } PrintStatistics(); + if (FLAGS_compaction_thread_pool_adjust_interval > 0) { + MutexLock l(shared.GetMutex()); + shared.SetShouldStopBgThread(); + while (!shared.BgThreadFinished()) { + shared.GetCondVar()->Wait(); + } + } + if (shared.HasVerificationFailedYet()) { printf("Verification failed :(\n"); return false; @@ -879,6 +912,38 @@ class StressTest { } + static void PoolSizeChangeThread(void* v) { + assert(FLAGS_compaction_thread_pool_adjust_interval > 0); + ThreadState* thread = reinterpret_cast(v); + SharedState* shared = thread->shared; + + while (true) { + { + MutexLock l(shared->GetMutex()); + if (shared->ShoudStopBgThread()) { + shared->SetBgThreadFinish(); + shared->GetCondVar()->SignalAll(); + return; + } + } + + auto thread_pool_size_base = FLAGS_max_background_compactions; + auto thread_pool_size_var = FLAGS_compaction_thread_pool_varations; + int new_thread_pool_size = + thread_pool_size_base - thread_pool_size_var + + thread->rand.Next() % (thread_pool_size_var * 2 + 1); + if (new_thread_pool_size < 1) { + new_thread_pool_size = 1; + } + FLAGS_env->SetBackgroundThreads(new_thread_pool_size); + // Sleep up to 3 seconds + FLAGS_env->SleepForMicroseconds( + thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval * + 1000 + + 1); + } + } + // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ... // ("9"+K, "9"+V) in DB atomically i.e in a single batch. // Also refer MultiGet. diff --git a/util/arena.h b/util/arena.h index 53acd2270..0855c205c 100644 --- a/util/arena.h +++ b/util/arena.h @@ -23,6 +23,8 @@ namespace rocksdb { class Logger; +const size_t kInlineSize = 2048; + class Arena { public: // No copying allowed diff --git a/util/dynamic_bloom.cc b/util/dynamic_bloom.cc index 09ffe71ec..7173bbb93 100644 --- a/util/dynamic_bloom.cc +++ b/util/dynamic_bloom.cc @@ -17,34 +17,41 @@ namespace { static uint32_t BloomHash(const Slice& key) { return Hash(key.data(), key.size(), 0xbc9f1d34); } + +uint32_t GetNumBlocks(uint32_t total_bits) { + uint32_t num_blocks = (total_bits + CACHE_LINE_SIZE * 8 - 1) / + (CACHE_LINE_SIZE * 8) * (CACHE_LINE_SIZE * 8); + // Make num_blocks an odd number to make sure more bits are involved + // when determining which block. + if (num_blocks % 2 == 0) { + num_blocks++; + } + return num_blocks; +} } -DynamicBloom::DynamicBloom(uint32_t total_bits, uint32_t cl_per_block, +DynamicBloom::DynamicBloom(uint32_t total_bits, uint32_t locality, uint32_t num_probes, uint32_t (*hash_func)(const Slice& key), size_t huge_page_tlb_size, Logger* logger) - : kBlocked(cl_per_block > 0), - kBitsPerBlock(std::min(cl_per_block, num_probes) * CACHE_LINE_SIZE * 8), - kTotalBits((kBlocked ? (total_bits + kBitsPerBlock - 1) / kBitsPerBlock * - kBitsPerBlock - : total_bits + 7) / + : kTotalBits(((locality > 0) ? GetNumBlocks(total_bits) : total_bits + 7) / 8 * 8), - kNumBlocks(kBlocked ? kTotalBits / kBitsPerBlock : 1), + kNumBlocks((locality > 0) ? kTotalBits / (CACHE_LINE_SIZE * 8) : 0), kNumProbes(num_probes), hash_func_(hash_func == nullptr ? &BloomHash : hash_func) { - assert(kBlocked ? kTotalBits > 0 : kTotalBits >= kBitsPerBlock); + assert(kNumBlocks > 0 || kTotalBits > 0); assert(kNumProbes > 0); uint32_t sz = kTotalBits / 8; - if (kBlocked) { + if (kNumBlocks > 0) { sz += CACHE_LINE_SIZE - 1; } raw_ = reinterpret_cast( arena_.AllocateAligned(sz, huge_page_tlb_size, logger)); memset(raw_, 0, sz); - if (kBlocked && (reinterpret_cast(raw_) % CACHE_LINE_SIZE)) { + if (kNumBlocks > 0 && (reinterpret_cast(raw_) % CACHE_LINE_SIZE)) { data_ = raw_ + CACHE_LINE_SIZE - - reinterpret_cast(raw_) % CACHE_LINE_SIZE; + reinterpret_cast(raw_) % CACHE_LINE_SIZE; } else { data_ = raw_; } diff --git a/util/dynamic_bloom.h b/util/dynamic_bloom.h index 73476eb3b..e59134591 100644 --- a/util/dynamic_bloom.h +++ b/util/dynamic_bloom.h @@ -8,6 +8,7 @@ #include #include +#include "port/port.h" #include namespace rocksdb { @@ -19,15 +20,14 @@ class DynamicBloom { public: // total_bits: fixed total bits for the bloom // num_probes: number of hash probes for a single key - // cl_per_block: block size in cache lines. When this is non-zero, a - // query/set is done within a block to improve cache locality. + // locality: If positive, optimize for cache line locality, 0 otherwise. // hash_func: customized hash function // huge_page_tlb_size: if >0, try to allocate bloom bytes from huge page TLB // withi this page size. Need to reserve huge pages for // it to be allocated, like: // sysctl -w vm.nr_hugepages=20 // See linux doc Documentation/vm/hugetlbpage.txt - explicit DynamicBloom(uint32_t total_bits, uint32_t cl_per_block = 0, + explicit DynamicBloom(uint32_t total_bits, uint32_t locality = 0, uint32_t num_probes = 6, uint32_t (*hash_func)(const Slice& key) = nullptr, size_t huge_page_tlb_size = 0, @@ -48,8 +48,6 @@ class DynamicBloom { bool MayContainHash(uint32_t hash); private: - const bool kBlocked; - const uint32_t kBitsPerBlock; const uint32_t kTotalBits; const uint32_t kNumBlocks; const uint32_t kNumProbes; @@ -69,13 +67,18 @@ inline bool DynamicBloom::MayContain(const Slice& key) { inline bool DynamicBloom::MayContainHash(uint32_t h) { const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits - if (kBlocked) { - uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * kBitsPerBlock; + if (kNumBlocks != 0) { + uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * (CACHE_LINE_SIZE * 8); for (uint32_t i = 0; i < kNumProbes; ++i) { - const uint32_t bitpos = b + h % kBitsPerBlock; + // Since CACHE_LINE_SIZE is defined as 2^n, this line will be optimized + // to a simple and operation by compiler. + const uint32_t bitpos = b + (h % (CACHE_LINE_SIZE * 8)); if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) { return false; } + // Rotate h so that we don't reuse the same bytes. + h = h / (CACHE_LINE_SIZE * 8) + + (h % (CACHE_LINE_SIZE * 8)) * (0x20000000U / CACHE_LINE_SIZE); h += delta; } } else { @@ -92,11 +95,16 @@ inline bool DynamicBloom::MayContainHash(uint32_t h) { inline void DynamicBloom::AddHash(uint32_t h) { const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits - if (kBlocked) { - uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * kBitsPerBlock; + if (kNumBlocks != 0) { + uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * (CACHE_LINE_SIZE * 8); for (uint32_t i = 0; i < kNumProbes; ++i) { - const uint32_t bitpos = b + h % kBitsPerBlock; + // Since CACHE_LINE_SIZE is defined as 2^n, this line will be optimized + // to a simple and operation by compiler. + const uint32_t bitpos = b + (h % (CACHE_LINE_SIZE * 8)); data_[bitpos / 8] |= (1 << (bitpos % 8)); + // Rotate h so that we don't reuse the same bytes. + h = h / (CACHE_LINE_SIZE * 8) + + (h % (CACHE_LINE_SIZE * 8)) * (0x20000000U / CACHE_LINE_SIZE); h += delta; } } else { diff --git a/util/dynamic_bloom_test.cc b/util/dynamic_bloom_test.cc index b72ab24c5..d345addba 100644 --- a/util/dynamic_bloom_test.cc +++ b/util/dynamic_bloom_test.cc @@ -91,17 +91,16 @@ TEST(DynamicBloomTest, VaryingLengths) { fprintf(stderr, "bits_per_key: %d num_probes: %d\n", FLAGS_bits_per_key, num_probes); - for (uint32_t cl_per_block = 0; cl_per_block < num_probes; - ++cl_per_block) { + for (uint32_t enable_locality = 0; enable_locality < 2; ++enable_locality) { for (uint32_t num = 1; num <= 10000; num = NextNum(num)) { uint32_t bloom_bits = 0; - if (cl_per_block == 0) { + if (enable_locality == 0) { bloom_bits = std::max(num * FLAGS_bits_per_key, 64U); } else { bloom_bits = std::max(num * FLAGS_bits_per_key, - cl_per_block * CACHE_LINE_SIZE * 8); + enable_locality * CACHE_LINE_SIZE * 8); } - DynamicBloom bloom(bloom_bits, cl_per_block, num_probes); + DynamicBloom bloom(bloom_bits, enable_locality, num_probes); for (uint64_t i = 0; i < num; i++) { bloom.Add(Key(i, buffer)); ASSERT_TRUE(bloom.MayContain(Key(i, buffer))); @@ -123,8 +122,10 @@ TEST(DynamicBloomTest, VaryingLengths) { } double rate = result / 10000.0; - fprintf(stderr, "False positives: %5.2f%% @ num = %6u, bloom_bits = %6u, " - "cl per block = %u\n", rate*100.0, num, bloom_bits, cl_per_block); + fprintf(stderr, + "False positives: %5.2f%% @ num = %6u, bloom_bits = %6u, " + "enable locality?%u\n", + rate * 100.0, num, bloom_bits, enable_locality); if (rate > 0.0125) mediocre_filters++; // Allowed, but not too often @@ -173,20 +174,20 @@ TEST(DynamicBloomTest, perf) { elapsed / count); ASSERT_TRUE(count == num_keys); - for (uint32_t cl_per_block = 1; cl_per_block <= num_probes; - ++cl_per_block) { - DynamicBloom blocked_bloom(num_keys * 10, cl_per_block, num_probes); + // Locality enabled version + DynamicBloom blocked_bloom(num_keys * 10, 1, num_probes); timer.Start(); for (uint64_t i = 1; i <= num_keys; ++i) { blocked_bloom.Add(Slice(reinterpret_cast(&i), 8)); } - uint64_t elapsed = timer.ElapsedNanos(); - fprintf(stderr, "blocked bloom(%d), avg add latency %" PRIu64 "\n", - cl_per_block, elapsed / num_keys); + elapsed = timer.ElapsedNanos(); + fprintf(stderr, + "blocked bloom(enable locality), avg add latency %" PRIu64 "\n", + elapsed / num_keys); - uint64_t count = 0; + count = 0; timer.Start(); for (uint64_t i = 1; i <= num_keys; ++i) { if (blocked_bloom.MayContain( @@ -196,11 +197,11 @@ TEST(DynamicBloomTest, perf) { } elapsed = timer.ElapsedNanos(); - fprintf(stderr, "blocked bloom(%d), avg query latency %" PRIu64 "\n", - cl_per_block, elapsed / count); + fprintf(stderr, + "blocked bloom(enable locality), avg query latency %" PRIu64 "\n", + elapsed / count); ASSERT_TRUE(count == num_keys); } - } } } // namespace rocksdb diff --git a/util/hash_cuckoo_rep.cc b/util/hash_cuckoo_rep.cc index 0161690dd..e6426a91c 100644 --- a/util/hash_cuckoo_rep.cc +++ b/util/hash_cuckoo_rep.cc @@ -250,7 +250,7 @@ class HashCuckooRep : public MemTableRep { // are sorted according to the user specified KeyComparator. Note that // any insert after this function call may affect the sorted nature of // the returned iterator. - virtual MemTableRep::Iterator* GetIterator() override { + virtual MemTableRep::Iterator* GetIterator(Arena* arena) override { std::vector compact_buckets; for (unsigned int bid = 0; bid < bucket_count_; ++bid) { const char* bucket = cuckoo_array_[bid].load(std::memory_order_relaxed); @@ -265,10 +265,18 @@ class HashCuckooRep : public MemTableRep { compact_buckets.push_back(iter->key()); } } - return new Iterator( - std::shared_ptr>( - new std::vector(std::move(compact_buckets))), - compare_); + if (arena == nullptr) { + return new Iterator( + std::shared_ptr>( + new std::vector(std::move(compact_buckets))), + compare_); + } else { + auto mem = arena->AllocateAligned(sizeof(Iterator)); + return new (mem) Iterator( + std::shared_ptr>( + new std::vector(std::move(compact_buckets))), + compare_); + } } }; diff --git a/util/hash_linklist_rep.cc b/util/hash_linklist_rep.cc index 80edad505..60f245b5f 100644 --- a/util/hash_linklist_rep.cc +++ b/util/hash_linklist_rep.cc @@ -70,11 +70,12 @@ class HashLinkListRep : public MemTableRep { virtual ~HashLinkListRep(); - virtual MemTableRep::Iterator* GetIterator() override; + virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override; virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override; - virtual MemTableRep::Iterator* GetDynamicPrefixIterator() override; + virtual MemTableRep::Iterator* GetDynamicPrefixIterator( + Arena* arena = nullptr) override; private: friend class DynamicIterator; @@ -411,7 +412,7 @@ void HashLinkListRep::Get(const LookupKey& k, void* callback_args, } } -MemTableRep::Iterator* HashLinkListRep::GetIterator() { +MemTableRep::Iterator* HashLinkListRep::GetIterator(Arena* alloc_arena) { // allocate a new arena of similar size to the one currently in use Arena* new_arena = new Arena(arena_->BlockSize()); auto list = new FullList(compare_, new_arena); @@ -424,7 +425,12 @@ MemTableRep::Iterator* HashLinkListRep::GetIterator() { } } } - return new FullListIterator(list, new_arena); + if (alloc_arena == nullptr) { + return new FullListIterator(list, new_arena); + } else { + auto mem = alloc_arena->AllocateAligned(sizeof(FullListIterator)); + return new (mem) FullListIterator(list, new_arena); + } } MemTableRep::Iterator* HashLinkListRep::GetIterator(const Slice& slice) { @@ -435,8 +441,14 @@ MemTableRep::Iterator* HashLinkListRep::GetIterator(const Slice& slice) { return new Iterator(this, bucket); } -MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator() { - return new DynamicIterator(*this); +MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator( + Arena* alloc_arena) { + if (alloc_arena == nullptr) { + return new DynamicIterator(*this); + } else { + auto mem = alloc_arena->AllocateAligned(sizeof(DynamicIterator)); + return new (mem) DynamicIterator(*this); + } } bool HashLinkListRep::BucketContains(Node* head, const Slice& user_key) const { diff --git a/util/hash_skiplist_rep.cc b/util/hash_skiplist_rep.cc index 1f03874d1..baee12ad5 100644 --- a/util/hash_skiplist_rep.cc +++ b/util/hash_skiplist_rep.cc @@ -38,11 +38,12 @@ class HashSkipListRep : public MemTableRep { virtual ~HashSkipListRep(); - virtual MemTableRep::Iterator* GetIterator() override; + virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override; virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override; - virtual MemTableRep::Iterator* GetDynamicPrefixIterator() override; + virtual MemTableRep::Iterator* GetDynamicPrefixIterator( + Arena* arena = nullptr) override; private: friend class DynamicIterator; @@ -288,7 +289,7 @@ void HashSkipListRep::Get(const LookupKey& k, void* callback_args, } } -MemTableRep::Iterator* HashSkipListRep::GetIterator() { +MemTableRep::Iterator* HashSkipListRep::GetIterator(Arena* arena) { // allocate a new arena of similar size to the one currently in use Arena* new_arena = new Arena(arena_->BlockSize()); auto list = new Bucket(compare_, new_arena); @@ -301,7 +302,12 @@ MemTableRep::Iterator* HashSkipListRep::GetIterator() { } } } - return new Iterator(list, true, new_arena); + if (arena == nullptr) { + return new Iterator(list, true, new_arena); + } else { + auto mem = arena->AllocateAligned(sizeof(Iterator)); + return new (mem) Iterator(list, true, new_arena); + } } MemTableRep::Iterator* HashSkipListRep::GetIterator(const Slice& slice) { @@ -312,8 +318,13 @@ MemTableRep::Iterator* HashSkipListRep::GetIterator(const Slice& slice) { return new Iterator(bucket, false); } -MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator() { - return new DynamicIterator(*this); +MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator(Arena* arena) { + if (arena == nullptr) { + return new DynamicIterator(*this); + } else { + auto mem = arena->AllocateAligned(sizeof(DynamicIterator)); + return new (mem) DynamicIterator(*this); + } } } // anon namespace diff --git a/util/options.cc b/util/options.cc index 4fe8b219e..c4d3e5e98 100644 --- a/util/options.cc +++ b/util/options.cc @@ -490,7 +490,9 @@ ColumnFamilyOptions* ColumnFamilyOptions::OptimizeForPointLookup() { BlockBasedTableOptions block_based_options; block_based_options.index_type = BlockBasedTableOptions::kBinarySearch; table_factory.reset(new BlockBasedTableFactory(block_based_options)); +#ifndef ROCKSDB_LITE memtable_factory.reset(NewHashLinkListRepFactory()); +#endif return this; } diff --git a/util/skiplistrep.cc b/util/skiplistrep.cc index f36edf28d..895343001 100644 --- a/util/skiplistrep.cc +++ b/util/skiplistrep.cc @@ -6,6 +6,7 @@ #include "rocksdb/memtablerep.h" #include "db/memtable.h" #include "db/skiplist.h" +#include "util/arena.h" namespace rocksdb { namespace { @@ -108,8 +109,13 @@ public: // Unhide default implementations of GetIterator using MemTableRep::GetIterator; - virtual MemTableRep::Iterator* GetIterator() override { - return new SkipListRep::Iterator(&skip_list_); + virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override { + if (arena == nullptr) { + return new SkipListRep::Iterator(&skip_list_); + } else { + auto mem = arena->AllocateAligned(sizeof(SkipListRep::Iterator)); + return new (mem) SkipListRep::Iterator(&skip_list_); + } } }; } diff --git a/util/vectorrep.cc b/util/vectorrep.cc index 00e5c7450..cf8bad5c4 100644 --- a/util/vectorrep.cc +++ b/util/vectorrep.cc @@ -95,7 +95,7 @@ class VectorRep : public MemTableRep { using MemTableRep::GetIterator; // Return an iterator over the keys in this representation. - virtual MemTableRep::Iterator* GetIterator() override; + virtual MemTableRep::Iterator* GetIterator(Arena* arena) override; private: friend class Iterator; @@ -259,16 +259,28 @@ void VectorRep::Get(const LookupKey& k, void* callback_args, } } -MemTableRep::Iterator* VectorRep::GetIterator() { +MemTableRep::Iterator* VectorRep::GetIterator(Arena* arena) { + char* mem = nullptr; + if (arena != nullptr) { + mem = arena->AllocateAligned(sizeof(Iterator)); + } ReadLock l(&rwlock_); // Do not sort here. The sorting would be done the first time // a Seek is performed on the iterator. if (immutable_) { - return new Iterator(this, bucket_, compare_); + if (arena == nullptr) { + return new Iterator(this, bucket_, compare_); + } else { + return new (mem) Iterator(this, bucket_, compare_); + } } else { std::shared_ptr tmp; tmp.reset(new Bucket(*bucket_)); // make a copy - return new Iterator(nullptr, tmp, compare_); + if (arena == nullptr) { + return new Iterator(nullptr, tmp, compare_); + } else { + return new (mem) Iterator(nullptr, tmp, compare_); + } } } } // anon namespace diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index e6874fe5d..b68f1c65b 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -918,7 +918,6 @@ TEST(BackupableDBTest, RateLimiting) { auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) / backupable_options_->backup_rate_limit; ASSERT_GT(backup_time, 0.9 * rate_limited_backup_time); - ASSERT_LT(backup_time, 2.5 * rate_limited_backup_time); CloseBackupableDB();