From 236d4c67e94bff85c952044e150a9c0b05e4e64b Mon Sep 17 00:00:00 2001 From: Mike Kolupaev Date: Mon, 28 Nov 2016 10:12:28 -0800 Subject: [PATCH] Less linear search in DBIter::Seek() when keys are overwritten a lot Summary: In one deployment we saw high latencies (presumably from slow iterator operations) and a lot of CPU time reported by perf with this stack: ``` rocksdb::MergingIterator::Next rocksdb::DBIter::FindNextUserEntryInternal rocksdb::DBIter::Seek ``` I think what's happening is: 1. we create a snapshot iterator, 2. we do lots of Put()s for the same key x; this creates lots of entries in memtable, 3. we seek the iterator to a key slightly smaller than x, 4. the seek walks over lots of entries in memtable for key x, skipping them because of high sequence numbers. CC IslamAbdelRahman Closes https://github.com/facebook/rocksdb/pull/1413 Differential Revision: D4083879 Pulled By: IslamAbdelRahman fbshipit-source-id: a83ddae --- .gitignore | 1 + db/db_iter.cc | 198 +++++++++++++++++++++------------ db/db_iterator_test.cc | 58 +++++++++- db/memtable.cc | 2 + include/rocksdb/perf_context.h | 15 ++- util/perf_context.cc | 8 ++ 6 files changed, 210 insertions(+), 72 deletions(-) diff --git a/.gitignore b/.gitignore index c104521e4..a6fc78568 100644 --- a/.gitignore +++ b/.gitignore @@ -37,6 +37,7 @@ util/build_version.cc build_tools/VALGRIND_LOGS/ coverage/COVERAGE_REPORT .gdbhistory +.gdb_history package/ .phutil_module_cache unity.a diff --git a/db/db_iter.cc b/db/db_iter.cc index a97a5af74..18f8a7033 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -339,7 +339,7 @@ void DBIter::Next() { // saved_value_ => the merged value // // NOTE: In between, saved_key_ can point to a user key that has -// a delete marker +// a delete marker or a sequence number higher than sequence_ // // The prefix_check parameter controls whether we check the iterated // keys against the prefix of the seeked key. Set to false when @@ -356,90 +356,137 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { assert(iter_->Valid()); assert(direction_ == kForward); current_entry_is_merged_ = false; + + // How many times in a row we have skipped an entry with user key less than + // or equal to saved_key_. We could skip these entries either because + // sequence numbers were too high or because skipping = true. + // What saved_key_ contains throughout this method: + // - if skipping : saved_key_ contains the key that we need to skip, + // and we haven't seen any keys greater than that, + // - if num_skipped > 0 : saved_key_ contains the key that we have skipped + // num_skipped times, and we haven't seen any keys + // greater than that, + // - none of the above : saved_key_ can contain anything, it doesn't matter. uint64_t num_skipped = 0; + do { ParsedInternalKey ikey; - if (ParseKey(&ikey)) { - if (iterate_upper_bound_ != nullptr && - user_comparator_->Compare(ikey.user_key, *iterate_upper_bound_) >= 0) { - break; - } + if (!ParseKey(&ikey)) { + // Skip corrupted keys. + iter_->Next(); + continue; + } - if (prefix_extractor_ && prefix_check && - prefix_extractor_->Transform(ikey.user_key).compare(prefix_start_key_) != 0) { - break; - } + if (iterate_upper_bound_ != nullptr && + user_comparator_->Compare(ikey.user_key, *iterate_upper_bound_) >= 0) { + break; + } - if (ikey.sequence <= sequence_) { - if (skipping && - user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) { - num_skipped++; // skip this entry - PERF_COUNTER_ADD(internal_key_skipped_count, 1); - } else { - switch (ikey.type) { - case kTypeDeletion: - case kTypeSingleDeletion: + if (prefix_extractor_ && prefix_check && + prefix_extractor_->Transform(ikey.user_key) + .compare(prefix_start_key_) != 0) { + break; + } + + if (ikey.sequence <= sequence_) { + if (skipping && + user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) { + num_skipped++; // skip this entry + PERF_COUNTER_ADD(internal_key_skipped_count, 1); + } else { + skipping = false; // ikey > saved_key_, i.e. saved_key_ is skipped + num_skipped = 0; + switch (ikey.type) { + case kTypeDeletion: + case kTypeSingleDeletion: + // Arrange to skip all upcoming entries for this key since + // they are hidden by this deletion. + saved_key_.SetKey( + ikey.user_key, + !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); + skipping = true; + PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + break; + case kTypeValue: + saved_key_.SetKey( + ikey.user_key, + !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); + if (range_del_agg_.ShouldDelete(ikey)) { // Arrange to skip all upcoming entries for this key since // they are hidden by this deletion. - saved_key_.SetKey( - ikey.user_key, - !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); skipping = true; num_skipped = 0; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); - break; - case kTypeValue: - saved_key_.SetKey( - ikey.user_key, - !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); - if (range_del_agg_.ShouldDelete(ikey)) { - // Arrange to skip all upcoming entries for this key since - // they are hidden by this deletion. - skipping = true; - num_skipped = 0; - PERF_COUNTER_ADD(internal_delete_skipped_count, 1); - } else { - valid_ = true; - return; - } - break; - case kTypeMerge: - saved_key_.SetKey( - ikey.user_key, - !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); - if (range_del_agg_.ShouldDelete(ikey)) { - // Arrange to skip all upcoming entries for this key since - // they are hidden by this deletion. - skipping = true; - num_skipped = 0; - PERF_COUNTER_ADD(internal_delete_skipped_count, 1); - } else { - // By now, we are sure the current ikey is going to yield a - // value - current_entry_is_merged_ = true; - valid_ = true; - MergeValuesNewToOld(); // Go to a different state machine - return; - } - break; - default: - assert(false); - break; - } + } else { + valid_ = true; + return; + } + break; + case kTypeMerge: + saved_key_.SetKey( + ikey.user_key, + !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); + if (range_del_agg_.ShouldDelete(ikey)) { + // Arrange to skip all upcoming entries for this key since + // they are hidden by this deletion. + skipping = true; + num_skipped = 0; + PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + } else { + // By now, we are sure the current ikey is going to yield a + // value + current_entry_is_merged_ = true; + valid_ = true; + MergeValuesNewToOld(); // Go to a different state machine + return; + } + break; + default: + assert(false); + break; } } + } else { + // This key was inserted after our snapshot was taken. + PERF_COUNTER_ADD(internal_recent_skipped_count, 1); + + // Here saved_key_ may contain some old key, or the default empty key, or + // key assigned by some random other method. We don't care. + if (user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) { + num_skipped++; + } else { + saved_key_.SetKey( + ikey.user_key, + !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); + skipping = false; + num_skipped = 0; + } } - // If we have sequentially iterated via numerous keys and still not - // found the next user-key, then it is better to seek so that we can - // avoid too many key comparisons. We seek to the last occurrence of - // our current key by looking for sequence number 0 and type deletion - // (the smallest type). - if (skipping && num_skipped > max_skip_) { + + // If we have sequentially iterated via numerous equal keys, then it's + // better to seek so that we can avoid too many key comparisons. + if (num_skipped > max_skip_) { num_skipped = 0; std::string last_key; - AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), 0, - kTypeDeletion)); + if (skipping) { + // We're looking for the next user-key but all we see are the same + // user-key with decreasing sequence numbers. Fast forward to + // sequence number 0 and type deletion (the smallest type). + AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), 0, + kTypeDeletion)); + // Don't set skipping = false because we may still see more user-keys + // equal to saved_key_. + } else { + // We saw multiple entries with this user key and sequence numbers + // higher than sequence_. Fast forward to sequence_. + // Note that this only covers a case when a higher key was overwritten + // many times since our snapshot was taken, not the case when a lot of + // different keys were inserted after our snapshot was taken. + AppendInternalKey(&last_key, + ParsedInternalKey(saved_key_.GetKey(), sequence_, + kValueTypeForSeek)); + } iter_->Seek(last_key); RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); } else { @@ -503,6 +550,7 @@ void DBIter::MergeValuesNewToOld() { // when complete, add result to operands and continue. merge_context_.PushOperand(iter_->value(), iter_->IsValuePinned() /* operand_pinned */); + PERF_COUNTER_ADD(internal_merge_count, 1); } else { assert(false); } @@ -564,6 +612,11 @@ void DBIter::ReverseToBackward() { FindParseableKey(&ikey, kReverse); while (iter_->Valid() && user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) > 0) { + if (ikey.sequence > sequence_) { + PERF_COUNTER_ADD(internal_recent_skipped_count, 1); + } else { + PERF_COUNTER_ADD(internal_key_skipped_count, 1); + } iter_->Prev(); FindParseableKey(&ikey, kReverse); } @@ -676,6 +729,7 @@ bool DBIter::FindValueForCurrentKey() { assert(merge_operator_ != nullptr); merge_context_.PushOperandBack( iter_->value(), iter_->IsValuePinned() /* operand_pinned */); + PERF_COUNTER_ADD(internal_merge_count, 1); } break; default: @@ -760,6 +814,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { ikey.type == kTypeMerge && !range_del_agg_.ShouldDelete(ikey)) { merge_context_.PushOperand(iter_->value(), iter_->IsValuePinned() /* operand_pinned */); + PERF_COUNTER_ADD(internal_merge_count, 1); iter_->Next(); FindParseableKey(&ikey, kForward); } @@ -830,6 +885,11 @@ void DBIter::FindPrevUserKey() { ++num_skipped; } } + if (ikey.sequence > sequence_) { + PERF_COUNTER_ADD(internal_recent_skipped_count, 1); + } else { + PERF_COUNTER_ADD(internal_key_skipped_count, 1); + } iter_->Prev(); FindParseableKey(&ikey, kReverse); } diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 20fd2ffb9..5292e9377 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -8,10 +8,10 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/db_test_util.h" +#include "port/port.h" #include "port/stack_trace.h" #include "rocksdb/iostats_context.h" #include "rocksdb/perf_context.h" -#include "port/port.h" namespace rocksdb { @@ -1778,6 +1778,62 @@ TEST_F(DBIteratorTest, ReadAhead) { delete iter; } +// Insert a key, create a snapshot iterator, overwrite key lots of times, +// seek to a smaller key. Expect DBIter to fall back to a seek instead of +// going through all the overwrites linearly. +TEST_F(DBIteratorTest, DBIteratorSkipRecentDuplicatesTest) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.max_sequential_skip_in_iterations = 3; + options.prefix_extractor = nullptr; + options.write_buffer_size = 1 << 27; // big enough to avoid flush + options.statistics = rocksdb::CreateDBStatistics(); + DestroyAndReopen(options); + + // Insert. + ASSERT_OK(Put("b", "0")); + + // Create iterator. + ReadOptions ro; + std::unique_ptr iter(db_->NewIterator(ro)); + + // Insert a lot. + for (int i = 0; i < 100; ++i) { + ASSERT_OK(Put("b", std::to_string(i + 1).c_str())); + } + +#ifndef ROCKSDB_LITE + // Check that memtable wasn't flushed. + std::string val; + ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &val)); + EXPECT_EQ("0", val); +#endif + + // Seek iterator to a smaller key. + perf_context.Reset(); + iter->Seek("a"); + ASSERT_TRUE(iter->Valid()); + EXPECT_EQ("b", iter->key().ToString()); + EXPECT_EQ("0", iter->value().ToString()); + + // Check that the seek didn't do too much work. + // Checks are not tight, just make sure that everything is well below 100. + EXPECT_LT(perf_context.internal_key_skipped_count, 4); + EXPECT_LT(perf_context.internal_recent_skipped_count, 8); + EXPECT_LT(perf_context.seek_on_memtable_count, 10); + EXPECT_LT(perf_context.next_on_memtable_count, 10); + EXPECT_LT(perf_context.prev_on_memtable_count, 10); + + // Check that iterator did something like what we expect. + EXPECT_EQ(perf_context.internal_delete_skipped_count, 0); + EXPECT_EQ(perf_context.internal_merge_count, 0); + EXPECT_GE(perf_context.internal_recent_skipped_count, 2); + EXPECT_GE(perf_context.seek_on_memtable_count, 2); + EXPECT_EQ(1, options.statistics->getTickerCount( + NUMBER_OF_RESEEKS_IN_ITERATION)); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/memtable.cc b/db/memtable.cc index ef6356d3b..ca3545fba 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -322,11 +322,13 @@ class MemTableIterator : public InternalIterator { valid_ = iter_->Valid(); } virtual void Next() override { + PERF_COUNTER_ADD(next_on_memtable_count, 1); assert(Valid()); iter_->Next(); valid_ = iter_->Valid(); } virtual void Prev() override { + PERF_COUNTER_ADD(prev_on_memtable_count, 1); assert(Valid()); iter_->Prev(); valid_ = iter_->Valid(); diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index 283cda9dc..2b2d6851a 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -45,8 +45,7 @@ struct PerfContext { // tombstones are not included in this counter, while previous updates // hidden by the tombstones will be included here. // 4. symmetric cases for Prev() and SeekToLast() - // We sometimes also skip entries of more recent updates than the snapshot - // we read from, but they are not included in this counter. + // internal_recent_skipped_count is not included in this counter. // uint64_t internal_key_skipped_count; // Total number of deletes and single deletes skipped over during iteration @@ -57,6 +56,13 @@ struct PerfContext { // still older updates invalidated by the tombstones. // uint64_t internal_delete_skipped_count; + // How many times iterators skipped over internal keys that are more recent + // than the snapshot that iterator is using. + // + uint64_t internal_recent_skipped_count; + // How many values were fed into merge operator by iterators. + // + uint64_t internal_merge_count; uint64_t get_snapshot_time; // total nanos spent on getting snapshot uint64_t get_from_memtable_time; // total nanos spent on querying memtables @@ -67,7 +73,12 @@ struct PerfContext { // total nanos spent on seeking memtable uint64_t seek_on_memtable_time; // number of seeks issued on memtable + // (including SeekForPrev but not SeekToFirst and SeekToLast) uint64_t seek_on_memtable_count; + // number of Next()s issued on memtable + uint64_t next_on_memtable_count; + // number of Prev()s issued on memtable + uint64_t prev_on_memtable_count; // total nanos spent on seeking child iters uint64_t seek_child_seek_time; // number of seek issued in child iterators diff --git a/util/perf_context.cc b/util/perf_context.cc index 5fdda1081..c18413d17 100644 --- a/util/perf_context.cc +++ b/util/perf_context.cc @@ -28,6 +28,8 @@ void PerfContext::Reset() { block_decompress_time = 0; internal_key_skipped_count = 0; internal_delete_skipped_count = 0; + internal_recent_skipped_count = 0; + internal_merge_count = 0; write_wal_time = 0; get_snapshot_time = 0; @@ -37,6 +39,8 @@ void PerfContext::Reset() { get_from_output_files_time = 0; seek_on_memtable_time = 0; seek_on_memtable_count = 0; + next_on_memtable_count = 0; + prev_on_memtable_count = 0; seek_child_seek_time = 0; seek_child_seek_count = 0; seek_min_heap_time = 0; @@ -80,6 +84,8 @@ std::string PerfContext::ToString(bool exclude_zero_counters) const { PERF_CONTEXT_OUTPUT(block_decompress_time); PERF_CONTEXT_OUTPUT(internal_key_skipped_count); PERF_CONTEXT_OUTPUT(internal_delete_skipped_count); + PERF_CONTEXT_OUTPUT(internal_recent_skipped_count); + PERF_CONTEXT_OUTPUT(internal_merge_count); PERF_CONTEXT_OUTPUT(write_wal_time); PERF_CONTEXT_OUTPUT(get_snapshot_time); PERF_CONTEXT_OUTPUT(get_from_memtable_time); @@ -88,6 +94,8 @@ std::string PerfContext::ToString(bool exclude_zero_counters) const { PERF_CONTEXT_OUTPUT(get_from_output_files_time); PERF_CONTEXT_OUTPUT(seek_on_memtable_time); PERF_CONTEXT_OUTPUT(seek_on_memtable_count); + PERF_CONTEXT_OUTPUT(next_on_memtable_count); + PERF_CONTEXT_OUTPUT(prev_on_memtable_count); PERF_CONTEXT_OUTPUT(seek_child_seek_time); PERF_CONTEXT_OUTPUT(seek_child_seek_count); PERF_CONTEXT_OUTPUT(seek_min_heap_time);