From 72c8533f2c137170478270cb74b218095ced8904 Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Tue, 23 Apr 2019 10:51:50 -0700 Subject: [PATCH] DBIter to use IteratorWrapper for inner iterator (#5214) Summary: It's hard to get DBIter to directly use InternalIterator::NextAndGetResult() because the code change would be complicated. Instead, use IteratorWrapper, where Next() is already using NextAndGetResult(). Performance number is hard to measure because it is small and ther is variation. I run readseq many times, and there seems to be 1% gain. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5214 Differential Revision: D15003635 Pulled By: siying fbshipit-source-id: 17af1965c409c2fe90cd85037fbd2c5a1364f82a --- db/db_iter.cc | 196 +++++++++++++++++++++++++------------------------- 1 file changed, 98 insertions(+), 98 deletions(-) diff --git a/db/db_iter.cc b/db/db_iter.cc index f955e7cf8..43a56af78 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -22,6 +22,7 @@ #include "rocksdb/merge_operator.h" #include "rocksdb/options.h" #include "table/internal_iterator.h" +#include "table/iterator_wrapper.h" #include "util/arena.h" #include "util/filename.h" #include "util/logging.h" @@ -149,8 +150,8 @@ class DBIter final: public Iterator { if (pin_thru_lifetime_) { pinned_iters_mgr_.StartPinning(); } - if (iter_) { - iter_->SetPinnedItersMgr(&pinned_iters_mgr_); + if (iter_.iter()) { + iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_); } } ~DBIter() override { @@ -161,16 +162,12 @@ class DBIter final: public Iterator { RecordTick(statistics_, NO_ITERATOR_DELETED); ResetInternalKeysSkippedCounter(); local_stats_.BumpGlobalStatistics(statistics_); - if (!arena_mode_) { - delete iter_; - } else { - iter_->~InternalIterator(); - } + iter_.DeleteIter(arena_mode_); } virtual void SetIter(InternalIterator* iter) { - assert(iter_ == nullptr); - iter_ = iter; - iter_->SetPinnedItersMgr(&pinned_iters_mgr_); + assert(iter_.iter() == nullptr); + iter_.Set(iter); + iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_); } virtual ReadRangeDelAggregator* GetRangeDelAggregator() { return &range_del_agg_; @@ -194,12 +191,12 @@ class DBIter final: public Iterator { } else if (direction_ == kReverse) { return pinned_value_; } else { - return iter_->value(); + return iter_.value(); } } Status status() const override { if (status_.ok()) { - return iter_->status(); + return iter_.status(); } else { assert(!valid_); return status_; @@ -216,7 +213,7 @@ class DBIter final: public Iterator { } if (prop_name == "rocksdb.iterator.super-version-number") { // First try to pass the value returned from inner iterator. - return iter_->GetProperty(prop_name, prop); + return iter_.iter()->GetProperty(prop_name, prop); } else if (prop_name == "rocksdb.iterator.is-key-pinned") { if (valid_) { *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0"; @@ -308,7 +305,7 @@ class DBIter final: public Iterator { Logger* logger_; UserComparatorWrapper user_comparator_; const MergeOperator* const merge_operator_; - InternalIterator* iter_; + IteratorWrapper iter_; ReadCallback* read_callback_; // Max visible sequence number. It is normally the snapshot seq unless we have // uncommitted data in db as in WriteUnCommitted. @@ -361,11 +358,11 @@ class DBIter final: public Iterator { }; inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { - if (!ParseInternalKey(iter_->key(), ikey)) { + if (!ParseInternalKey(iter_.key(), ikey)) { status_ = Status::Corruption("corrupted internal key in DBIter"); valid_ = false; ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s", - iter_->key().ToString(true).c_str()); + iter_.key().ToString(true).c_str()); return false; } else { return true; @@ -393,13 +390,13 @@ void DBIter::Next() { // Next() without checking the current key. // If the current key is a merge, very likely iter already points // to the next internal position. - assert(iter_->Valid()); - iter_->Next(); + assert(iter_.Valid()); + iter_.Next(); PERF_COUNTER_ADD(internal_key_skipped_count, 1); } local_stats_.next_count_++; - if (ok && iter_->Valid()) { + if (ok && iter_.Valid()) { FindNextUserEntry(true /* skipping the current user key */, prefix_same_as_start_); } else { @@ -433,7 +430,7 @@ inline bool DBIter::FindNextUserEntry(bool skipping, bool prefix_check) { // Actual implementation of DBIter::FindNextUserEntry() inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // Loop until we hit an acceptable entry to yield - assert(iter_->Valid()); + assert(iter_.Valid()); assert(status_.ok()); assert(direction_ == kForward); current_entry_is_merged_ = false; @@ -495,8 +492,8 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) return true; } else { saved_key_.SetUserKey( - ikey_.user_key, - !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); + ikey_.user_key, !pin_thru_lifetime_ || + !iter_.iter()->IsKeyPinned() /* copy */); skipping = true; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); } @@ -516,14 +513,16 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) } else { // this key and all previous versions shouldn't be included, // skipping - saved_key_.SetUserKey(ikey_.user_key, - !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); + saved_key_.SetUserKey( + ikey_.user_key, + !pin_thru_lifetime_ || + !iter_.iter()->IsKeyPinned() /* copy */); skipping = true; } } else { saved_key_.SetUserKey( - ikey_.user_key, - !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); + ikey_.user_key, !pin_thru_lifetime_ || + !iter_.iter()->IsKeyPinned() /* copy */); if (range_del_agg_.ShouldDelete( ikey_, RangeDelPositioningMode::kForwardTraversal)) { // Arrange to skip all upcoming entries for this key since @@ -553,7 +552,7 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) case kTypeMerge: saved_key_.SetUserKey( ikey_.user_key, - !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); + !pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */); if (range_del_agg_.ShouldDelete( ikey_, RangeDelPositioningMode::kForwardTraversal)) { // Arrange to skip all upcoming entries for this key since @@ -587,7 +586,7 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) } else { saved_key_.SetUserKey( ikey_.user_key, - !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); + !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); skipping = false; num_skipped = 0; } @@ -616,20 +615,20 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) ParsedInternalKey(saved_key_.GetUserKey(), sequence_, kValueTypeForSeek)); } - iter_->Seek(last_key); + iter_.Seek(last_key); RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); } else { - iter_->Next(); + iter_.Next(); } - } while (iter_->Valid()); + } while (iter_.Valid()); valid_ = false; - return iter_->status().ok(); + return iter_.status().ok(); } // Merge values of the same user key starting from the current iter_ position // Scan from the newer entries to older entries. -// PRE: iter_->key() points to the first merge type entry +// PRE: iter_.key() points to the first merge type entry // saved_key_ stores the user key // POST: saved_value_ has the merged value for the user key // iter_ points to the next entry (or invalid) @@ -645,13 +644,13 @@ bool DBIter::MergeValuesNewToOld() { TempPinData(); merge_context_.Clear(); // Start the merge process by pushing the first operand - merge_context_.PushOperand(iter_->value(), - iter_->IsValuePinned() /* operand_pinned */); + merge_context_.PushOperand( + iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */); TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand"); ParsedInternalKey ikey; Status s; - for (iter_->Next(); iter_->Valid(); iter_->Next()) { + for (iter_.Next(); iter_.Valid(); iter_.Next()) { TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand"); if (!ParseKey(&ikey)) { return false; @@ -665,12 +664,12 @@ bool DBIter::MergeValuesNewToOld() { ikey, RangeDelPositioningMode::kForwardTraversal)) { // hit a delete with the same user key, stop right here // iter_ is positioned after delete - iter_->Next(); + iter_.Next(); break; } else if (kTypeValue == ikey.type) { // hit a put, merge the put value with operands and store the // final result in saved_value_. We are done! - const Slice val = iter_->value(); + const Slice val = iter_.value(); s = MergeHelper::TimedFullMerge( merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_, &pinned_value_, true); @@ -680,8 +679,8 @@ bool DBIter::MergeValuesNewToOld() { return false; } // iter_ is positioned after put - iter_->Next(); - if (!iter_->status().ok()) { + iter_.Next(); + if (!iter_.status().ok()) { valid_ = false; return false; } @@ -689,8 +688,8 @@ bool DBIter::MergeValuesNewToOld() { } else if (kTypeMerge == ikey.type) { // hit a merge, add the value as an operand and run associative merge. // when complete, add result to operands and continue. - merge_context_.PushOperand(iter_->value(), - iter_->IsValuePinned() /* operand_pinned */); + merge_context_.PushOperand( + iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */); PERF_COUNTER_ADD(internal_merge_count, 1); } else if (kTypeBlobIndex == ikey.type) { if (!allow_blob_) { @@ -709,7 +708,7 @@ bool DBIter::MergeValuesNewToOld() { } } - if (!iter_->status().ok()) { + if (!iter_.status().ok()) { valid_ = false; return false; } @@ -758,21 +757,21 @@ void DBIter::Prev() { } bool DBIter::ReverseToForward() { - assert(iter_->status().ok()); + assert(iter_.status().ok()); // When moving backwards, iter_ is positioned on _previous_ key, which may // not exist or may have different prefix than the current key(). // If that's the case, seek iter_ to current key. - if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) { + if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_.Valid()) { IterKey last_key; last_key.SetInternalKey(ParsedInternalKey( saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); - iter_->Seek(last_key.GetInternalKey()); + iter_.Seek(last_key.GetInternalKey()); } direction_ = kForward; // Skip keys less than the current key() (a.k.a. saved_key_). - while (iter_->Valid()) { + while (iter_.Valid()) { ParsedInternalKey ikey; if (!ParseKey(&ikey)) { return false; @@ -780,10 +779,10 @@ bool DBIter::ReverseToForward() { if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) >= 0) { return true; } - iter_->Next(); + iter_.Next(); } - if (!iter_->status().ok()) { + if (!iter_.status().ok()) { valid_ = false; return false; } @@ -793,14 +792,14 @@ bool DBIter::ReverseToForward() { // Move iter_ to the key before saved_key_. bool DBIter::ReverseToBackward() { - assert(iter_->status().ok()); + assert(iter_.status().ok()); // When current_entry_is_merged_ is true, iter_ may be positioned on the next // key, which may not exist or may have prefix different from current. // If that's the case, seek to saved_key_. if (current_entry_is_merged_ && ((prefix_extractor_ != nullptr && !total_order_seek_) || - !iter_->Valid())) { + !iter_.Valid())) { IterKey last_key; // Using kMaxSequenceNumber and kValueTypeForSeek // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller @@ -808,15 +807,15 @@ bool DBIter::ReverseToBackward() { last_key.SetInternalKey(ParsedInternalKey( saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); if (prefix_extractor_ != nullptr && !total_order_seek_) { - iter_->SeekForPrev(last_key.GetInternalKey()); + iter_.SeekForPrev(last_key.GetInternalKey()); } else { // Some iterators may not support SeekForPrev(), so we avoid using it // when prefix seek mode is disabled. This is somewhat expensive // (an extra Prev(), as well as an extra change of direction of iter_), // so we may need to reconsider it later. - iter_->Seek(last_key.GetInternalKey()); - if (!iter_->Valid() && iter_->status().ok()) { - iter_->SeekToLast(); + iter_.Seek(last_key.GetInternalKey()); + if (!iter_.Valid() && iter_.status().ok()) { + iter_.SeekToLast(); } } } @@ -826,10 +825,10 @@ bool DBIter::ReverseToBackward() { } void DBIter::PrevInternal() { - while (iter_->Valid()) { + while (iter_.Valid()) { saved_key_.SetUserKey( - ExtractUserKey(iter_->key()), - !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); + ExtractUserKey(iter_.key()), + !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); if (prefix_extractor_ && prefix_same_as_start_ && prefix_extractor_->Transform(saved_key_.GetUserKey()) @@ -883,7 +882,7 @@ void DBIter::PrevInternal() { // POST: iter_ is positioned on one of the entries equal to saved_key_, or on // the entry just before them, or on the entry just after them. bool DBIter::FindValueForCurrentKey() { - assert(iter_->Valid()); + assert(iter_.Valid()); merge_context_.Clear(); current_entry_is_merged_ = false; // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or @@ -895,7 +894,7 @@ bool DBIter::FindValueForCurrentKey() { ReleaseTempPinnedData(); TempPinData(); size_t num_skipped = 0; - while (iter_->Valid()) { + while (iter_.Valid()) { ParsedInternalKey ikey; if (!ParseKey(&ikey)) { return false; @@ -925,8 +924,8 @@ bool DBIter::FindValueForCurrentKey() { last_key_entry_type = kTypeRangeDeletion; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); } else { - assert(iter_->IsValuePinned()); - pinned_value_ = iter_->value(); + assert(iter_.iter()->IsValuePinned()); + pinned_value_ = iter_.value(); } merge_context_.Clear(); last_not_merge_type = last_key_entry_type; @@ -947,7 +946,8 @@ bool DBIter::FindValueForCurrentKey() { } else { assert(merge_operator_ != nullptr); merge_context_.PushOperandBack( - iter_->value(), iter_->IsValuePinned() /* operand_pinned */); + iter_.value(), + iter_.iter()->IsValuePinned() /* operand_pinned */); PERF_COUNTER_ADD(internal_merge_count, 1); } break; @@ -956,11 +956,11 @@ bool DBIter::FindValueForCurrentKey() { } PERF_COUNTER_ADD(internal_key_skipped_count, 1); - iter_->Prev(); + iter_.Prev(); ++num_skipped; } - if (!iter_->status().ok()) { + if (!iter_.status().ok()) { valid_ = false; return false; } @@ -1040,16 +1040,16 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { std::string last_key; AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(), sequence_, kValueTypeForSeek)); - iter_->Seek(last_key); + iter_.Seek(last_key); RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); // In case read_callback presents, the value we seek to may not be visible. // Find the next value that's visible. ParsedInternalKey ikey; while (true) { - if (!iter_->Valid()) { + if (!iter_.Valid()) { valid_ = false; - return iter_->status().ok(); + return iter_.status().ok(); } if (!ParseKey(&ikey)) { @@ -1067,7 +1067,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { break; } - iter_->Next(); + iter_.Next(); } if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || @@ -1085,8 +1085,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { return false; } if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) { - assert(iter_->IsValuePinned()); - pinned_value_ = iter_->value(); + assert(iter_.iter()->IsValuePinned()); + pinned_value_ = iter_.value(); valid_ = true; return true; } @@ -1096,13 +1096,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { assert(ikey.type == kTypeMerge); current_entry_is_merged_ = true; merge_context_.Clear(); - merge_context_.PushOperand(iter_->value(), - iter_->IsValuePinned() /* operand_pinned */); + merge_context_.PushOperand( + iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */); while (true) { - iter_->Next(); + iter_.Next(); - if (!iter_->Valid()) { - if (!iter_->status().ok()) { + if (!iter_.Valid()) { + if (!iter_.status().ok()) { valid_ = false; return false; } @@ -1120,7 +1120,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { ikey, RangeDelPositioningMode::kForwardTraversal)) { break; } else if (ikey.type == kTypeValue) { - const Slice val = iter_->value(); + const Slice val = iter_.value(); Status s = MergeHelper::TimedFullMerge( merge_operator_, saved_key_.GetUserKey(), &val, merge_context_.GetOperands(), &saved_value_, logger_, statistics_, @@ -1133,8 +1133,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { valid_ = true; return true; } else if (ikey.type == kTypeMerge) { - merge_context_.PushOperand(iter_->value(), - iter_->IsValuePinned() /* operand_pinned */); + merge_context_.PushOperand( + iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */); PERF_COUNTER_ADD(internal_merge_count, 1); } else if (ikey.type == kTypeBlobIndex) { if (!allow_blob_) { @@ -1166,13 +1166,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { // Make sure we leave iter_ in a good state. If it's valid and we don't care // about prefixes, that's already good enough. Otherwise it needs to be // seeked to the current key. - if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) { + if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_.Valid()) { if (prefix_extractor_ != nullptr && !total_order_seek_) { - iter_->SeekForPrev(last_key); + iter_.SeekForPrev(last_key); } else { - iter_->Seek(last_key); - if (!iter_->Valid() && iter_->status().ok()) { - iter_->SeekToLast(); + iter_.Seek(last_key); + if (!iter_.Valid() && iter_.status().ok()) { + iter_.SeekToLast(); } } RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); @@ -1187,7 +1187,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { bool DBIter::FindUserKeyBeforeSavedKey() { assert(status_.ok()); size_t num_skipped = 0; - while (iter_->Valid()) { + while (iter_.Valid()) { ParsedInternalKey ikey; if (!ParseKey(&ikey)) { return false; @@ -1215,19 +1215,19 @@ bool DBIter::FindUserKeyBeforeSavedKey() { saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); // It would be more efficient to use SeekForPrev() here, but some // iterators may not support it. - iter_->Seek(last_key.GetInternalKey()); + iter_.Seek(last_key.GetInternalKey()); RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); - if (!iter_->Valid()) { + if (!iter_.Valid()) { break; } } else { ++num_skipped; } - iter_->Prev(); + iter_.Prev(); } - if (!iter_->status().ok()) { + if (!iter_.status().ok()) { valid_ = false; return false; } @@ -1285,11 +1285,11 @@ void DBIter::Seek(const Slice& target) { { PERF_TIMER_GUARD(seek_internal_seek_time); - iter_->Seek(saved_key_.GetInternalKey()); + iter_.Seek(saved_key_.GetInternalKey()); range_del_agg_.InvalidateRangeDelMapPositions(); } RecordTick(statistics_, NUMBER_DB_SEEK); - if (iter_->Valid()) { + if (iter_.Valid()) { if (prefix_extractor_ && prefix_same_as_start_) { prefix_start_key_ = prefix_extractor_->Transform(target); } @@ -1337,7 +1337,7 @@ void DBIter::SeekForPrev(const Slice& target) { { PERF_TIMER_GUARD(seek_internal_seek_time); - iter_->SeekForPrev(saved_key_.GetInternalKey()); + iter_.SeekForPrev(saved_key_.GetInternalKey()); range_del_agg_.InvalidateRangeDelMapPositions(); } @@ -1348,7 +1348,7 @@ void DBIter::SeekForPrev(const Slice& target) { #endif // ROCKSDB_LITE RecordTick(statistics_, NUMBER_DB_SEEK); - if (iter_->Valid()) { + if (iter_.Valid()) { if (prefix_extractor_ && prefix_same_as_start_) { prefix_start_key_ = prefix_extractor_->Transform(target); } @@ -1393,15 +1393,15 @@ void DBIter::SeekToFirst() { { PERF_TIMER_GUARD(seek_internal_seek_time); - iter_->SeekToFirst(); + iter_.SeekToFirst(); range_del_agg_.InvalidateRangeDelMapPositions(); } RecordTick(statistics_, NUMBER_DB_SEEK); - if (iter_->Valid()) { + if (iter_.Valid()) { saved_key_.SetUserKey( - ExtractUserKey(iter_->key()), - !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); + ExtractUserKey(iter_.key()), + !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); FindNextUserEntry(false /* not skipping */, false /* no prefix check */); if (statistics_ != nullptr) { if (valid_) { @@ -1445,7 +1445,7 @@ void DBIter::SeekToLast() { { PERF_TIMER_GUARD(seek_internal_seek_time); - iter_->SeekToLast(); + iter_.SeekToLast(); range_del_agg_.InvalidateRangeDelMapPositions(); } PrevInternal();