diff --git a/CMakeLists.txt b/CMakeLists.txt index db9e371aa..9076af3ab 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -481,6 +481,7 @@ set(SOURCES cache/clock_cache.cc cache/lru_cache.cc cache/sharded_cache.cc + db/arena_wrapped_db_iter.cc db/builder.cc db/c.cc db/column_family.cc diff --git a/TARGETS b/TARGETS index 0a8388775..b603d027f 100644 --- a/TARGETS +++ b/TARGETS @@ -108,6 +108,7 @@ cpp_library( "cache/clock_cache.cc", "cache/lru_cache.cc", "cache/sharded_cache.cc", + "db/arena_wrapped_db_iter.cc", "db/builder.cc", "db/c.cc", "db/column_family.cc", diff --git a/db/arena_wrapped_db_iter.cc b/db/arena_wrapped_db_iter.cc new file mode 100644 index 000000000..6a1635f62 --- /dev/null +++ b/db/arena_wrapped_db_iter.cc @@ -0,0 +1,106 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/arena_wrapped_db_iter.h" +#include "memory/arena.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "table/internal_iterator.h" +#include "table/iterator_wrapper.h" +#include "util/user_comparator_wrapper.h" + +namespace rocksdb { + +Status ArenaWrappedDBIter::GetProperty(std::string prop_name, + std::string* prop) { + if (prop_name == "rocksdb.iterator.super-version-number") { + // First try to pass the value returned from inner iterator. + if (!db_iter_->GetProperty(prop_name, prop).ok()) { + *prop = ToString(sv_number_); + } + return Status::OK(); + } + return db_iter_->GetProperty(prop_name, prop); +} + +void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options, + const ImmutableCFOptions& cf_options, + const MutableCFOptions& mutable_cf_options, + const SequenceNumber& sequence, + uint64_t max_sequential_skip_in_iteration, + uint64_t version_number, + ReadCallback* read_callback, DBImpl* db_impl, + ColumnFamilyData* cfd, bool allow_blob, + bool allow_refresh) { + auto mem = arena_.AllocateAligned(sizeof(DBIter)); + db_iter_ = new (mem) DBIter(env, read_options, cf_options, mutable_cf_options, + cf_options.user_comparator, nullptr, sequence, + true, max_sequential_skip_in_iteration, + read_callback, db_impl, cfd, allow_blob); + sv_number_ = version_number; + allow_refresh_ = allow_refresh; +} + +Status ArenaWrappedDBIter::Refresh() { + if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) { + return Status::NotSupported("Creating renew iterator is not allowed."); + } + assert(db_iter_ != nullptr); + // TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the + // correct behavior. Will be corrected automatically when we take a snapshot + // here for the case of WritePreparedTxnDB. + SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber(); + uint64_t cur_sv_number = cfd_->GetSuperVersionNumber(); + if (sv_number_ != cur_sv_number) { + Env* env = db_iter_->env(); + db_iter_->~DBIter(); + arena_.~Arena(); + new (&arena_) Arena(); + + SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); + if (read_callback_) { + read_callback_->Refresh(latest_seq); + } + Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options, + latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, + cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_, + allow_refresh_); + + InternalIterator* internal_iter = db_impl_->NewInternalIterator( + read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(), + latest_seq); + SetIterUnderDBIter(internal_iter); + } else { + db_iter_->set_sequence(latest_seq); + db_iter_->set_valid(false); + } + return Status::OK(); +} + +ArenaWrappedDBIter* NewArenaWrappedDbIterator( + Env* env, const ReadOptions& read_options, + const ImmutableCFOptions& cf_options, + const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence, + uint64_t max_sequential_skip_in_iterations, uint64_t version_number, + ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, + bool allow_blob, bool allow_refresh) { + ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); + iter->Init(env, read_options, cf_options, mutable_cf_options, sequence, + max_sequential_skip_in_iterations, version_number, read_callback, + db_impl, cfd, allow_blob, allow_refresh); + if (db_impl != nullptr && cfd != nullptr && allow_refresh) { + iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback, + allow_blob); + } + + return iter; +} + +} // namespace rocksdb diff --git a/db/arena_wrapped_db_iter.h b/db/arena_wrapped_db_iter.h new file mode 100644 index 000000000..6dbd64521 --- /dev/null +++ b/db/arena_wrapped_db_iter.h @@ -0,0 +1,112 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include +#include "db/db_impl/db_impl.h" +#include "db/db_iter.h" +#include "db/dbformat.h" +#include "db/range_del_aggregator.h" +#include "memory/arena.h" +#include "options/cf_options.h" +#include "rocksdb/db.h" +#include "rocksdb/iterator.h" +#include "util/autovector.h" + +namespace rocksdb { + +class Arena; + +// A wrapper iterator which wraps DB Iterator and the arena, with which the DB +// iterator is supposed to be allocated. This class is used as an entry point of +// a iterator hierarchy whose memory can be allocated inline. In that way, +// accessing the iterator tree can be more cache friendly. It is also faster +// to allocate. +// When using the class's Iterator interface, the behavior is exactly +// the same as the inner DBIter. +class ArenaWrappedDBIter : public Iterator { + public: + virtual ~ArenaWrappedDBIter() { db_iter_->~DBIter(); } + + // Get the arena to be used to allocate memory for DBIter to be wrapped, + // as well as child iterators in it. + virtual Arena* GetArena() { return &arena_; } + virtual ReadRangeDelAggregator* GetRangeDelAggregator() { + return db_iter_->GetRangeDelAggregator(); + } + + // Set the internal iterator wrapped inside the DB Iterator. Usually it is + // a merging iterator. + virtual void SetIterUnderDBIter(InternalIterator* iter) { + static_cast(db_iter_)->SetIter(iter); + } + + virtual bool Valid() const override { return db_iter_->Valid(); } + virtual void SeekToFirst() override { db_iter_->SeekToFirst(); } + virtual void SeekToLast() override { db_iter_->SeekToLast(); } + virtual void Seek(const Slice& target) override { db_iter_->Seek(target); } + virtual void SeekForPrev(const Slice& target) override { + db_iter_->SeekForPrev(target); + } + virtual void Next() override { db_iter_->Next(); } + virtual void Prev() override { db_iter_->Prev(); } + virtual Slice key() const override { return db_iter_->key(); } + virtual Slice value() const override { return db_iter_->value(); } + virtual Status status() const override { return db_iter_->status(); } + bool IsBlob() const { return db_iter_->IsBlob(); } + + virtual Status GetProperty(std::string prop_name, std::string* prop) override; + + virtual Status Refresh() override; + + void Init(Env* env, const ReadOptions& read_options, + const ImmutableCFOptions& cf_options, + const MutableCFOptions& mutable_cf_options, + const SequenceNumber& sequence, + uint64_t max_sequential_skip_in_iterations, uint64_t version_number, + ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, + bool allow_blob, bool allow_refresh); + + // Store some parameters so we can refresh the iterator at a later point + // with these same params + void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl, + ColumnFamilyData* cfd, ReadCallback* read_callback, + bool allow_blob) { + read_options_ = read_options; + db_impl_ = db_impl; + cfd_ = cfd; + read_callback_ = read_callback; + allow_blob_ = allow_blob; + } + + private: + DBIter* db_iter_; + Arena arena_; + uint64_t sv_number_; + ColumnFamilyData* cfd_ = nullptr; + DBImpl* db_impl_ = nullptr; + ReadOptions read_options_; + ReadCallback* read_callback_; + bool allow_blob_ = false; + bool allow_refresh_ = true; +}; + +// Generate the arena wrapped iterator class. +// `db_impl` and `cfd` are used for reneweal. If left null, renewal will not +// be supported. +extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( + Env* env, const ReadOptions& read_options, + const ImmutableCFOptions& cf_options, + const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence, + uint64_t max_sequential_skip_in_iterations, uint64_t version_number, + ReadCallback* read_callback, DBImpl* db_impl = nullptr, + ColumnFamilyData* cfd = nullptr, bool allow_blob = false, + bool allow_refresh = true); +} // namespace rocksdb diff --git a/db/db_blob_index_test.cc b/db/db_blob_index_test.cc index e9618885a..0cdc93dde 100644 --- a/db/db_blob_index_test.cc +++ b/db/db_blob_index_test.cc @@ -12,6 +12,7 @@ #include #include +#include "db/arena_wrapped_db_iter.h" #include "db/column_family.h" #include "db/db_iter.h" #include "db/db_test_util.h" diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index fd1f0b45c..8f104f707 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -25,6 +25,7 @@ #include #include +#include "db/arena_wrapped_db_iter.h" #include "db/builder.h" #include "db/compaction/compaction_job.h" #include "db/db_info_dumper.h" diff --git a/db/db_impl/db_impl_readonly.cc b/db/db_impl/db_impl_readonly.cc index 6db498397..d989bd8c7 100644 --- a/db/db_impl/db_impl_readonly.cc +++ b/db/db_impl/db_impl_readonly.cc @@ -3,6 +3,7 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include "db/arena_wrapped_db_iter.h" #include "db/db_impl/db_impl_readonly.h" #include "db/compacted_db_impl.h" diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 1a55c328e..da4a1da3a 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -7,7 +7,7 @@ #include -#include "db/db_iter.h" +#include "db/arena_wrapped_db_iter.h" #include "db/merge_context.h" #include "logging/auto_roll_logger.h" #include "monitoring/perf_context_imp.h" diff --git a/db/db_iter.cc b/db/db_iter.cc index 961431611..8af13b7fa 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -46,323 +46,72 @@ static void DumpInternalIter(Iterator* iter) { } #endif -// Memtables and sstables that make the DB representation contain -// (userkey,seq,type) => uservalue entries. DBIter -// combines multiple entries for the same userkey found in the DB -// representation into a single entry while accounting for sequence -// numbers, deletion markers, overwrites, etc. -class DBIter final: public Iterator { - public: - // The following is grossly complicated. TODO: clean it up - // Which direction is the iterator currently moving? - // (1) When moving forward: - // (1a) if current_entry_is_merged_ = false, the internal iterator is - // positioned at the exact entry that yields this->key(), this->value() - // (1b) if current_entry_is_merged_ = true, the internal iterator is - // positioned immediately after the last entry that contributed to the - // current this->value(). That entry may or may not have key equal to - // this->key(). - // (2) When moving backwards, the internal iterator is positioned - // just before all entries whose user key == this->key(). - enum Direction { - kForward, - kReverse - }; - - // LocalStatistics contain Statistics counters that will be aggregated per - // each iterator instance and then will be sent to the global statistics when - // the iterator is destroyed. - // - // The purpose of this approach is to avoid perf regression happening - // when multiple threads bump the atomic counters from a DBIter::Next(). - struct LocalStatistics { - explicit LocalStatistics() { ResetCounters(); } - - void ResetCounters() { - next_count_ = 0; - next_found_count_ = 0; - prev_count_ = 0; - prev_found_count_ = 0; - bytes_read_ = 0; - skip_count_ = 0; - } - - void BumpGlobalStatistics(Statistics* global_statistics) { - RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_); - RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_); - RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_); - RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_); - RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_); - RecordTick(global_statistics, NUMBER_ITER_SKIP, skip_count_); - PERF_COUNTER_ADD(iter_read_bytes, bytes_read_); - ResetCounters(); - } - - // Map to Tickers::NUMBER_DB_NEXT - uint64_t next_count_; - // Map to Tickers::NUMBER_DB_NEXT_FOUND - uint64_t next_found_count_; - // Map to Tickers::NUMBER_DB_PREV - uint64_t prev_count_; - // Map to Tickers::NUMBER_DB_PREV_FOUND - uint64_t prev_found_count_; - // Map to Tickers::ITER_BYTES_READ - uint64_t bytes_read_; - // Map to Tickers::NUMBER_ITER_SKIP - uint64_t skip_count_; - }; - - DBIter(Env* _env, const ReadOptions& read_options, - const ImmutableCFOptions& cf_options, - const MutableCFOptions& mutable_cf_options, const Comparator* cmp, - InternalIterator* iter, SequenceNumber s, bool arena_mode, - uint64_t max_sequential_skip_in_iterations, - ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, - bool allow_blob) - : env_(_env), - logger_(cf_options.info_log), - user_comparator_(cmp), - merge_operator_(cf_options.merge_operator), - iter_(iter), - read_callback_(read_callback), - sequence_(s), - statistics_(cf_options.statistics), - num_internal_keys_skipped_(0), - iterate_lower_bound_(read_options.iterate_lower_bound), - iterate_upper_bound_(read_options.iterate_upper_bound), - direction_(kForward), - valid_(false), - current_entry_is_merged_(false), - is_key_seqnum_zero_(false), - prefix_same_as_start_(read_options.prefix_same_as_start), - pin_thru_lifetime_(read_options.pin_data), - total_order_seek_(read_options.total_order_seek), - allow_blob_(allow_blob), - is_blob_(false), - arena_mode_(arena_mode), - range_del_agg_(&cf_options.internal_comparator, s), - db_impl_(db_impl), - cfd_(cfd), - start_seqnum_(read_options.iter_start_seqnum) { - RecordTick(statistics_, NO_ITERATOR_CREATED); - prefix_extractor_ = mutable_cf_options.prefix_extractor.get(); - max_skip_ = max_sequential_skip_in_iterations; - max_skippable_internal_keys_ = read_options.max_skippable_internal_keys; - if (pin_thru_lifetime_) { - pinned_iters_mgr_.StartPinning(); - } - if (iter_.iter()) { - iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_); - } - } - // No copying allowed - DBIter(const DBIter&) = delete; - void operator=(const DBIter&) = delete; - - ~DBIter() override { - // Release pinned data if any - if (pinned_iters_mgr_.PinningEnabled()) { - pinned_iters_mgr_.ReleasePinnedData(); - } - RecordTick(statistics_, NO_ITERATOR_DELETED); - ResetInternalKeysSkippedCounter(); - local_stats_.BumpGlobalStatistics(statistics_); - iter_.DeleteIter(arena_mode_); - } - virtual void SetIter(InternalIterator* iter) { - assert(iter_.iter() == nullptr); - iter_.Set(iter); +DBIter::DBIter(Env* _env, const ReadOptions& read_options, + const ImmutableCFOptions& cf_options, + const MutableCFOptions& mutable_cf_options, const Comparator* cmp, + InternalIterator* iter, SequenceNumber s, bool arena_mode, + uint64_t max_sequential_skip_in_iterations, + ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, + bool allow_blob) + : env_(_env), + logger_(cf_options.info_log), + user_comparator_(cmp), + merge_operator_(cf_options.merge_operator), + iter_(iter), + read_callback_(read_callback), + sequence_(s), + statistics_(cf_options.statistics), + num_internal_keys_skipped_(0), + iterate_lower_bound_(read_options.iterate_lower_bound), + iterate_upper_bound_(read_options.iterate_upper_bound), + direction_(kForward), + valid_(false), + current_entry_is_merged_(false), + is_key_seqnum_zero_(false), + prefix_same_as_start_(read_options.prefix_same_as_start), + pin_thru_lifetime_(read_options.pin_data), + total_order_seek_(read_options.total_order_seek), + allow_blob_(allow_blob), + is_blob_(false), + arena_mode_(arena_mode), + range_del_agg_(&cf_options.internal_comparator, s), + db_impl_(db_impl), + cfd_(cfd), + start_seqnum_(read_options.iter_start_seqnum) { + RecordTick(statistics_, NO_ITERATOR_CREATED); + prefix_extractor_ = mutable_cf_options.prefix_extractor.get(); + max_skip_ = max_sequential_skip_in_iterations; + max_skippable_internal_keys_ = read_options.max_skippable_internal_keys; + if (pin_thru_lifetime_) { + pinned_iters_mgr_.StartPinning(); + } + if (iter_.iter()) { iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_); } - virtual ReadRangeDelAggregator* GetRangeDelAggregator() { - return &range_del_agg_; - } - - bool Valid() const override { return valid_; } - Slice key() const override { - assert(valid_); - if(start_seqnum_ > 0) { - return saved_key_.GetInternalKey(); - } else { - return saved_key_.GetUserKey(); - } - } - Slice value() const override { - assert(valid_); - if (current_entry_is_merged_) { - // If pinned_value_ is set then the result of merge operator is one of - // the merge operands and we should return it. - return pinned_value_.data() ? pinned_value_ : saved_value_; - } else if (direction_ == kReverse) { - return pinned_value_; - } else { - return iter_.value(); - } - } - Status status() const override { - if (status_.ok()) { - return iter_.status(); - } else { - assert(!valid_); - return status_; - } - } - bool IsBlob() const { - assert(valid_ && (allow_blob_ || !is_blob_)); - return is_blob_; - } - - Status GetProperty(std::string prop_name, std::string* prop) override { - if (prop == nullptr) { - return Status::InvalidArgument("prop is nullptr"); - } - if (prop_name == "rocksdb.iterator.super-version-number") { - // First try to pass the value returned from inner iterator. - return iter_.iter()->GetProperty(prop_name, prop); - } else if (prop_name == "rocksdb.iterator.is-key-pinned") { - if (valid_) { - *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0"; - } else { - *prop = "Iterator is not valid."; - } - return Status::OK(); - } else if (prop_name == "rocksdb.iterator.internal-key") { - *prop = saved_key_.GetUserKey().ToString(); - return Status::OK(); - } - return Status::InvalidArgument("Unidentified property."); - } - - inline void Next() final override; - inline void Prev() final override; - inline void Seek(const Slice& target) final override; - inline void SeekForPrev(const Slice& target) final override; - inline void SeekToFirst() final override; - inline void SeekToLast() final override; - Env* env() { return env_; } - void set_sequence(uint64_t s) { - sequence_ = s; - if (read_callback_) { - read_callback_->Refresh(s); - } - } - void set_valid(bool v) { valid_ = v; } - - private: - // For all methods in this block: - // PRE: iter_->Valid() && status_.ok() - // Return false if there was an error, and status() is non-ok, valid_ = false; - // in this case callers would usually stop what they were doing and return. - bool ReverseToForward(); - bool ReverseToBackward(); - bool FindValueForCurrentKey(); - bool FindValueForCurrentKeyUsingSeek(); - bool FindUserKeyBeforeSavedKey(); - inline bool FindNextUserEntry(bool skipping, bool prefix_check); - inline bool FindNextUserEntryInternal(bool skipping, bool prefix_check); - bool ParseKey(ParsedInternalKey* key); - bool MergeValuesNewToOld(); - - void PrevInternal(); - bool TooManyInternalKeysSkipped(bool increment = true); - inline bool IsVisible(SequenceNumber sequence); - - // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() - // is called - void TempPinData() { - if (!pin_thru_lifetime_) { - pinned_iters_mgr_.StartPinning(); - } - } +} - // Release blocks pinned by TempPinData() - void ReleaseTempPinnedData() { - if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) { - pinned_iters_mgr_.ReleasePinnedData(); - } +Status DBIter::GetProperty(std::string prop_name, std::string* prop) { + if (prop == nullptr) { + return Status::InvalidArgument("prop is nullptr"); } - - inline void ClearSavedValue() { - if (saved_value_.capacity() > 1048576) { - std::string empty; - swap(empty, saved_value_); - } else { - saved_value_.clear(); - } - } - - inline void ResetInternalKeysSkippedCounter() { - local_stats_.skip_count_ += num_internal_keys_skipped_; + if (prop_name == "rocksdb.iterator.super-version-number") { + // First try to pass the value returned from inner iterator. + return iter_.iter()->GetProperty(prop_name, prop); + } else if (prop_name == "rocksdb.iterator.is-key-pinned") { if (valid_) { - local_stats_.skip_count_--; + *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0"; + } else { + *prop = "Iterator is not valid."; } - num_internal_keys_skipped_ = 0; + return Status::OK(); + } else if (prop_name == "rocksdb.iterator.internal-key") { + *prop = saved_key_.GetUserKey().ToString(); + return Status::OK(); } + return Status::InvalidArgument("Unidentified property."); +} - const SliceTransform* prefix_extractor_; - Env* const env_; - Logger* logger_; - UserComparatorWrapper user_comparator_; - const MergeOperator* const merge_operator_; - IteratorWrapper iter_; - ReadCallback* read_callback_; - // Max visible sequence number. It is normally the snapshot seq unless we have - // uncommitted data in db as in WriteUnCommitted. - SequenceNumber sequence_; - - IterKey saved_key_; - // Reusable internal key data structure. This is only used inside one function - // and should not be used across functions. Reusing this object can reduce - // overhead of calling construction of the function if creating it each time. - ParsedInternalKey ikey_; - std::string saved_value_; - Slice pinned_value_; - // for prefix seek mode to support prev() - Statistics* statistics_; - uint64_t max_skip_; - uint64_t max_skippable_internal_keys_; - uint64_t num_internal_keys_skipped_; - const Slice* iterate_lower_bound_; - const Slice* iterate_upper_bound_; - - IterKey prefix_start_buf_; - - Status status_; - Slice prefix_start_key_; - Direction direction_; - bool valid_; - bool current_entry_is_merged_; - // True if we know that the current entry's seqnum is 0. - // This information is used as that the next entry will be for another - // user key. - bool is_key_seqnum_zero_; - const bool prefix_same_as_start_; - // Means that we will pin all data blocks we read as long the Iterator - // is not deleted, will be true if ReadOptions::pin_data is true - const bool pin_thru_lifetime_; - const bool total_order_seek_; - bool allow_blob_; - bool is_blob_; - bool arena_mode_; - // List of operands for merge operator. - MergeContext merge_context_; - ReadRangeDelAggregator range_del_agg_; - LocalStatistics local_stats_; - PinnedIteratorsManager pinned_iters_mgr_; -#ifdef ROCKSDB_LITE - ROCKSDB_FIELD_UNUSED -#endif - DBImpl* db_impl_; -#ifdef ROCKSDB_LITE - ROCKSDB_FIELD_UNUSED -#endif - ColumnFamilyData* cfd_; - // for diff snapshots we want the lower bound on the seqnum; - // if this value > 0 iterator will return internal keys - SequenceNumber start_seqnum_; -}; - -inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { +bool DBIter::ParseKey(ParsedInternalKey* ikey) { if (!ParseInternalKey(iter_.key(), ikey)) { status_ = Status::Corruption("corrupted internal key in DBIter"); valid_ = false; @@ -429,13 +178,13 @@ void DBIter::Next() { // keys against the prefix of the seeked key. Set to false when // performing a seek without a key (e.g. SeekToFirst). Set to // prefix_same_as_start_ for other iterations. -inline bool DBIter::FindNextUserEntry(bool skipping, bool prefix_check) { +bool DBIter::FindNextUserEntry(bool skipping, bool prefix_check) { PERF_TIMER_GUARD(find_next_user_entry_time); return FindNextUserEntryInternal(skipping, prefix_check); } // Actual implementation of DBIter::FindNextUserEntry() -inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { +bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // Loop until we hit an acceptable entry to yield assert(iter_.Valid()); assert(status_.ok()); @@ -1523,114 +1272,4 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, return db_iter; } -ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); } - -ReadRangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() { - return db_iter_->GetRangeDelAggregator(); -} - -void ArenaWrappedDBIter::SetIterUnderDBIter(InternalIterator* iter) { - static_cast(db_iter_)->SetIter(iter); -} - -inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); } -inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); } -inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); } -inline void ArenaWrappedDBIter::Seek(const Slice& target) { - db_iter_->Seek(target); -} -inline void ArenaWrappedDBIter::SeekForPrev(const Slice& target) { - db_iter_->SeekForPrev(target); -} -inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); } -inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); } -inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); } -inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); } -inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); } -bool ArenaWrappedDBIter::IsBlob() const { return db_iter_->IsBlob(); } -inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name, - std::string* prop) { - if (prop_name == "rocksdb.iterator.super-version-number") { - // First try to pass the value returned from inner iterator. - if (!db_iter_->GetProperty(prop_name, prop).ok()) { - *prop = ToString(sv_number_); - } - return Status::OK(); - } - return db_iter_->GetProperty(prop_name, prop); -} - -void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options, - const ImmutableCFOptions& cf_options, - const MutableCFOptions& mutable_cf_options, - const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iteration, - uint64_t version_number, - ReadCallback* read_callback, DBImpl* db_impl, - ColumnFamilyData* cfd, bool allow_blob, - bool allow_refresh) { - auto mem = arena_.AllocateAligned(sizeof(DBIter)); - db_iter_ = new (mem) DBIter(env, read_options, cf_options, mutable_cf_options, - cf_options.user_comparator, nullptr, sequence, - true, max_sequential_skip_in_iteration, - read_callback, db_impl, cfd, allow_blob); - sv_number_ = version_number; - allow_refresh_ = allow_refresh; -} - -Status ArenaWrappedDBIter::Refresh() { - if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) { - return Status::NotSupported("Creating renew iterator is not allowed."); - } - assert(db_iter_ != nullptr); - // TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the - // correct behavior. Will be corrected automatically when we take a snapshot - // here for the case of WritePreparedTxnDB. - SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber(); - uint64_t cur_sv_number = cfd_->GetSuperVersionNumber(); - if (sv_number_ != cur_sv_number) { - Env* env = db_iter_->env(); - db_iter_->~DBIter(); - arena_.~Arena(); - new (&arena_) Arena(); - - SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); - if (read_callback_) { - read_callback_->Refresh(latest_seq); - } - Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options, - latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, - cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_, - allow_refresh_); - - InternalIterator* internal_iter = db_impl_->NewInternalIterator( - read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(), - latest_seq); - SetIterUnderDBIter(internal_iter); - } else { - db_iter_->set_sequence(latest_seq); - db_iter_->set_valid(false); - } - return Status::OK(); -} - -ArenaWrappedDBIter* NewArenaWrappedDbIterator( - Env* env, const ReadOptions& read_options, - const ImmutableCFOptions& cf_options, - const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, - bool allow_blob, bool allow_refresh) { - ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); - iter->Init(env, read_options, cf_options, mutable_cf_options, sequence, - max_sequential_skip_in_iterations, version_number, read_callback, - db_impl, cfd, allow_blob, allow_refresh); - if (db_impl != nullptr && cfd != nullptr && allow_refresh) { - iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback, - allow_blob); - } - - return iter; -} - } // namespace rocksdb diff --git a/db/db_iter.h b/db/db_iter.h index 6a4bf8a55..d1b60ca8f 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -17,6 +17,7 @@ #include "options/cf_options.h" #include "rocksdb/db.h" #include "rocksdb/iterator.h" +#include "table/iterator_wrapper.h" #include "util/autovector.h" namespace rocksdb { @@ -46,9 +47,268 @@ namespace rocksdb { // key: BBB value: v1 // key: BBC value: v1 // -class Arena; -class DBIter; +// Memtables and sstables that make the DB representation contain +// (userkey,seq,type) => uservalue entries. DBIter +// combines multiple entries for the same userkey found in the DB +// representation into a single entry while accounting for sequence +// numbers, deletion markers, overwrites, etc. +class DBIter final: public Iterator { + public: + // The following is grossly complicated. TODO: clean it up + // Which direction is the iterator currently moving? + // (1) When moving forward: + // (1a) if current_entry_is_merged_ = false, the internal iterator is + // positioned at the exact entry that yields this->key(), this->value() + // (1b) if current_entry_is_merged_ = true, the internal iterator is + // positioned immediately after the last entry that contributed to the + // current this->value(). That entry may or may not have key equal to + // this->key(). + // (2) When moving backwards, the internal iterator is positioned + // just before all entries whose user key == this->key(). + enum Direction { + kForward, + kReverse + }; + + // LocalStatistics contain Statistics counters that will be aggregated per + // each iterator instance and then will be sent to the global statistics when + // the iterator is destroyed. + // + // The purpose of this approach is to avoid perf regression happening + // when multiple threads bump the atomic counters from a DBIter::Next(). + struct LocalStatistics { + explicit LocalStatistics() { ResetCounters(); } + + void ResetCounters() { + next_count_ = 0; + next_found_count_ = 0; + prev_count_ = 0; + prev_found_count_ = 0; + bytes_read_ = 0; + skip_count_ = 0; + } + + void BumpGlobalStatistics(Statistics* global_statistics) { + RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_); + RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_); + RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_); + RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_); + RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_); + RecordTick(global_statistics, NUMBER_ITER_SKIP, skip_count_); + PERF_COUNTER_ADD(iter_read_bytes, bytes_read_); + ResetCounters(); + } + + // Map to Tickers::NUMBER_DB_NEXT + uint64_t next_count_; + // Map to Tickers::NUMBER_DB_NEXT_FOUND + uint64_t next_found_count_; + // Map to Tickers::NUMBER_DB_PREV + uint64_t prev_count_; + // Map to Tickers::NUMBER_DB_PREV_FOUND + uint64_t prev_found_count_; + // Map to Tickers::ITER_BYTES_READ + uint64_t bytes_read_; + // Map to Tickers::NUMBER_ITER_SKIP + uint64_t skip_count_; + }; + + DBIter(Env* _env, const ReadOptions& read_options, + const ImmutableCFOptions& cf_options, + const MutableCFOptions& mutable_cf_options, const Comparator* cmp, + InternalIterator* iter, SequenceNumber s, bool arena_mode, + uint64_t max_sequential_skip_in_iterations, + ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, + bool allow_blob); + + // No copying allowed + DBIter(const DBIter&) = delete; + void operator=(const DBIter&) = delete; + + ~DBIter() override { + // Release pinned data if any + if (pinned_iters_mgr_.PinningEnabled()) { + pinned_iters_mgr_.ReleasePinnedData(); + } + RecordTick(statistics_, NO_ITERATOR_DELETED); + ResetInternalKeysSkippedCounter(); + local_stats_.BumpGlobalStatistics(statistics_); + iter_.DeleteIter(arena_mode_); + } + virtual void SetIter(InternalIterator* iter) { + assert(iter_.iter() == nullptr); + iter_.Set(iter); + iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_); + } + virtual ReadRangeDelAggregator* GetRangeDelAggregator() { + return &range_del_agg_; + } + + bool Valid() const override { return valid_; } + Slice key() const override { + assert(valid_); + if(start_seqnum_ > 0) { + return saved_key_.GetInternalKey(); + } else { + return saved_key_.GetUserKey(); + } + } + Slice value() const override { + assert(valid_); + if (current_entry_is_merged_) { + // If pinned_value_ is set then the result of merge operator is one of + // the merge operands and we should return it. + return pinned_value_.data() ? pinned_value_ : saved_value_; + } else if (direction_ == kReverse) { + return pinned_value_; + } else { + return iter_.value(); + } + } + Status status() const override { + if (status_.ok()) { + return iter_.status(); + } else { + assert(!valid_); + return status_; + } + } + bool IsBlob() const { + assert(valid_ && (allow_blob_ || !is_blob_)); + return is_blob_; + } + + Status GetProperty(std::string prop_name, std::string* prop) override; + + void Next() final override; + void Prev() final override; + void Seek(const Slice& target) final override; + void SeekForPrev(const Slice& target) final override; + void SeekToFirst() final override; + void SeekToLast() final override; + Env* env() { return env_; } + void set_sequence(uint64_t s) { + sequence_ = s; + if (read_callback_) { + read_callback_->Refresh(s); + } + } + void set_valid(bool v) { valid_ = v; } + + private: + // For all methods in this block: + // PRE: iter_->Valid() && status_.ok() + // Return false if there was an error, and status() is non-ok, valid_ = false; + // in this case callers would usually stop what they were doing and return. + bool ReverseToForward(); + bool ReverseToBackward(); + bool FindValueForCurrentKey(); + bool FindValueForCurrentKeyUsingSeek(); + bool FindUserKeyBeforeSavedKey(); + bool FindNextUserEntry(bool skipping, bool prefix_check); + bool FindNextUserEntryInternal(bool skipping, bool prefix_check); + bool ParseKey(ParsedInternalKey* key); + bool MergeValuesNewToOld(); + + void PrevInternal(); + bool TooManyInternalKeysSkipped(bool increment = true); + bool IsVisible(SequenceNumber sequence); + + // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() + // is called + void TempPinData() { + if (!pin_thru_lifetime_) { + pinned_iters_mgr_.StartPinning(); + } + } + + // Release blocks pinned by TempPinData() + void ReleaseTempPinnedData() { + if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) { + pinned_iters_mgr_.ReleasePinnedData(); + } + } + + inline void ClearSavedValue() { + if (saved_value_.capacity() > 1048576) { + std::string empty; + swap(empty, saved_value_); + } else { + saved_value_.clear(); + } + } + + inline void ResetInternalKeysSkippedCounter() { + local_stats_.skip_count_ += num_internal_keys_skipped_; + if (valid_) { + local_stats_.skip_count_--; + } + num_internal_keys_skipped_ = 0; + } + + const SliceTransform* prefix_extractor_; + Env* const env_; + Logger* logger_; + UserComparatorWrapper user_comparator_; + const MergeOperator* const merge_operator_; + IteratorWrapper iter_; + ReadCallback* read_callback_; + // Max visible sequence number. It is normally the snapshot seq unless we have + // uncommitted data in db as in WriteUnCommitted. + SequenceNumber sequence_; + + IterKey saved_key_; + // Reusable internal key data structure. This is only used inside one function + // and should not be used across functions. Reusing this object can reduce + // overhead of calling construction of the function if creating it each time. + ParsedInternalKey ikey_; + std::string saved_value_; + Slice pinned_value_; + // for prefix seek mode to support prev() + Statistics* statistics_; + uint64_t max_skip_; + uint64_t max_skippable_internal_keys_; + uint64_t num_internal_keys_skipped_; + const Slice* iterate_lower_bound_; + const Slice* iterate_upper_bound_; + + IterKey prefix_start_buf_; + + Status status_; + Slice prefix_start_key_; + Direction direction_; + bool valid_; + bool current_entry_is_merged_; + // True if we know that the current entry's seqnum is 0. + // This information is used as that the next entry will be for another + // user key. + bool is_key_seqnum_zero_; + const bool prefix_same_as_start_; + // Means that we will pin all data blocks we read as long the Iterator + // is not deleted, will be true if ReadOptions::pin_data is true + const bool pin_thru_lifetime_; + const bool total_order_seek_; + bool allow_blob_; + bool is_blob_; + bool arena_mode_; + // List of operands for merge operator. + MergeContext merge_context_; + ReadRangeDelAggregator range_del_agg_; + LocalStatistics local_stats_; + PinnedIteratorsManager pinned_iters_mgr_; +#ifdef ROCKSDB_LITE + ROCKSDB_FIELD_UNUSED +#endif + DBImpl* db_impl_; +#ifdef ROCKSDB_LITE + ROCKSDB_FIELD_UNUSED +#endif + ColumnFamilyData* cfd_; + // for diff snapshots we want the lower bound on the seqnum; + // if this value > 0 iterator will return internal keys + SequenceNumber start_seqnum_; +}; // Return a new iterator that converts internal keys (yielded by // "*internal_iter") that were live at the specified `sequence` number // into appropriate user keys. @@ -61,79 +321,4 @@ extern Iterator* NewDBIterator( ReadCallback* read_callback, DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr, bool allow_blob = false); -// A wrapper iterator which wraps DB Iterator and the arena, with which the DB -// iterator is supposed be allocated. This class is used as an entry point of -// a iterator hierarchy whose memory can be allocated inline. In that way, -// accessing the iterator tree can be more cache friendly. It is also faster -// to allocate. -// When using the class's Iterator interface, the behavior is exactly -// the same as the inner DBIter. -class ArenaWrappedDBIter : public Iterator { - public: - virtual ~ArenaWrappedDBIter(); - - // Get the arena to be used to allocate memory for DBIter to be wrapped, - // as well as child iterators in it. - virtual Arena* GetArena() { return &arena_; } - virtual ReadRangeDelAggregator* GetRangeDelAggregator(); - - // Set the internal iterator wrapped inside the DB Iterator. Usually it is - // a merging iterator. - virtual void SetIterUnderDBIter(InternalIterator* iter); - virtual bool Valid() const override; - virtual void SeekToFirst() override; - virtual void SeekToLast() override; - virtual void Seek(const Slice& target) override; - virtual void SeekForPrev(const Slice& target) override; - virtual void Next() override; - virtual void Prev() override; - virtual Slice key() const override; - virtual Slice value() const override; - virtual Status status() const override; - virtual Status Refresh() override; - bool IsBlob() const; - - virtual Status GetProperty(std::string prop_name, std::string* prop) override; - - void Init(Env* env, const ReadOptions& read_options, - const ImmutableCFOptions& cf_options, - const MutableCFOptions& mutable_cf_options, - const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, - bool allow_blob, bool allow_refresh); - - void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl, - ColumnFamilyData* cfd, ReadCallback* read_callback, - bool allow_blob) { - read_options_ = read_options; - db_impl_ = db_impl; - cfd_ = cfd; - read_callback_ = read_callback; - allow_blob_ = allow_blob; - } - - private: - DBIter* db_iter_; - Arena arena_; - uint64_t sv_number_; - ColumnFamilyData* cfd_ = nullptr; - DBImpl* db_impl_ = nullptr; - ReadOptions read_options_; - ReadCallback* read_callback_; - bool allow_blob_ = false; - bool allow_refresh_ = true; -}; - -// Generate the arena wrapped iterator class. -// `db_impl` and `cfd` are used for reneweal. If left null, renewal will not -// be supported. -extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( - Env* env, const ReadOptions& read_options, - const ImmutableCFOptions& cf_options, - const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - ReadCallback* read_callback, DBImpl* db_impl = nullptr, - ColumnFamilyData* cfd = nullptr, bool allow_blob = false, - bool allow_refresh = true); } // namespace rocksdb diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 997b38602..9ea7ea0d9 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -9,6 +9,7 @@ #include +#include "db/arena_wrapped_db_iter.h" #include "db/db_iter.h" #include "db/db_test_util.h" #include "port/port.h" diff --git a/src.mk b/src.mk index e37c8c919..7a555253d 100644 --- a/src.mk +++ b/src.mk @@ -3,6 +3,7 @@ LIB_SOURCES = \ cache/clock_cache.cc \ cache/lru_cache.cc \ cache/sharded_cache.cc \ + db/arena_wrapped_db_iter.cc \ db/builder.cc \ db/c.cc \ db/column_family.cc \ diff --git a/utilities/blob_db/blob_db_iterator.h b/utilities/blob_db/blob_db_iterator.h index 1565c670b..f332445de 100644 --- a/utilities/blob_db/blob_db_iterator.h +++ b/utilities/blob_db/blob_db_iterator.h @@ -6,6 +6,7 @@ #pragma once #ifndef ROCKSDB_LITE +#include "db/arena_wrapped_db_iter.h" #include "monitoring/statistics.h" #include "rocksdb/iterator.h" #include "util/stop_watch.h" diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index f58305de2..d06c2b62d 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -13,6 +13,7 @@ #include #include +#include "db/arena_wrapped_db_iter.h" #include "db/db_impl/db_impl.h" #include "rocksdb/db.h" #include "rocksdb/options.h" diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index f6809faeb..b883f4496 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -5,6 +5,7 @@ #ifndef ROCKSDB_LITE +#include "db/arena_wrapped_db_iter.h" #include "utilities/transactions/write_unprepared_txn_db.h" #include "rocksdb/utilities/transaction_db.h" #include "util/cast_util.h"