diff --git a/db/db_tailing_iter_test.cc b/db/db_tailing_iter_test.cc index e95841c58..e6731df6a 100644 --- a/db/db_tailing_iter_test.cc +++ b/db/db_tailing_iter_test.cc @@ -117,6 +117,80 @@ TEST_F(DBTestTailingIterator, TailingIteratorSeekToNext) { } } +TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + + char bufe[32]; + snprintf(bufe, sizeof(bufe), "00b0%016d", 0); + Slice keyu(bufe, 20); + read_options.iterate_upper_bound = &keyu; + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + std::unique_ptr itern(db_->NewIterator(read_options, handles_[1])); + std::unique_ptr iterh(db_->NewIterator(read_options, handles_[1])); + std::string value(1024, 'a'); + + const int num_records = 1000; + for (int i = 1; i < num_records; ++i) { + char buf1[32]; + char buf2[32]; + char buf3[32]; + char buf4[32]; + snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); + snprintf(buf3, sizeof(buf1), "00b0%016d", i * 5); + + Slice key(buf1, 20); + ASSERT_OK(Put(1, key, value)); + Slice keyn(buf3, 20); + ASSERT_OK(Put(1, keyn, value)); + + if (i % 100 == 99) { + ASSERT_OK(Flush(1)); + dbfull()->TEST_WaitForCompact(); + snprintf(buf4, sizeof(buf1), "00a0%016d", i * 5 / 2); + Slice target(buf4, 20); + iterh->Seek(target); + ASSERT_TRUE(iter->Valid()); + for (int j = (i + 1) * 5 / 2; j < i * 5; j += 5) { + iterh->Next(); + ASSERT_TRUE(iterh->Valid()); + } + } + + snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); + Slice target(buf2, 20); + iter->Seek(target); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + if (i == 1) { + itern->SeekToFirst(); + } else { + itern->Next(); + } + ASSERT_TRUE(itern->Valid()); + ASSERT_EQ(itern->key().compare(key), 0); + } + for (int i = 2 * num_records; i > 0; --i) { + char buf1[32]; + char buf2[32]; + snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); + + Slice key(buf1, 20); + ASSERT_OK(Put(1, key, value)); + + if (i % 100 == 99) { + ASSERT_OK(Flush(1)); + } + + snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); + Slice target(buf2, 20); + iter->Seek(target); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } +} + TEST_F(DBTestTailingIterator, TailingIteratorDeletes) { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ReadOptions read_options; diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index cd745b7f0..832aa832d 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -116,7 +116,8 @@ class LevelIterator : public Iterator { }; ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options, - ColumnFamilyData* cfd, SuperVersion* current_sv) + ColumnFamilyData* cfd, + SuperVersion* current_sv) : db_(db), read_options_(read_options), cfd_(cfd), @@ -129,6 +130,7 @@ ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options, status_(Status::OK()), immutable_status_(Status::OK()), valid_(false), + has_iter_trimmed_for_upper_bound_(false), is_prev_set_(false), is_prev_inclusive_(false) { if (sv_) { @@ -189,7 +191,17 @@ void ForwardIterator::SeekToFirst() { SeekInternal(Slice(), true); } +bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const { + return !(read_options_.iterate_upper_bound == nullptr || + cfd_->internal_comparator().user_comparator()->Compare( + ExtractUserKey(internal_key), + *read_options_.iterate_upper_bound) < 0); +} + void ForwardIterator::Seek(const Slice& internal_key) { + if (IsOverUpperBound(internal_key)) { + valid_ = false; + } if (sv_ == nullptr || sv_ ->version_number != cfd_->GetSuperVersionNumber()) { RebuildIterators(true); @@ -212,16 +224,33 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, // an option to turn it off. if (seek_to_first || NeedToSeekImmutable(internal_key)) { immutable_status_ = Status::OK(); + if (NeedToRebuildTrimmed(internal_key)) { + // Some iterators are trimmed. Need to rebuild. + RebuildIterators(true); + // Already seeked mutable iter, so seek again + seek_to_first ? mutable_iter_->SeekToFirst() + : mutable_iter_->Seek(internal_key); + } { auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator())); immutable_min_heap_.swap(tmp); } - for (auto* m : imm_iters_) { + for (size_t i = 0; i < imm_iters_.size(); i++) { + auto* m = imm_iters_[i]; + if (!m) { + continue; + } seek_to_first ? m->SeekToFirst() : m->Seek(internal_key); if (!m->status().ok()) { immutable_status_ = m->status(); } else if (m->Valid()) { - immutable_min_heap_.push(m); + if (!IsOverUpperBound(m->key())) { + immutable_min_heap_.push(m); + } else { + has_iter_trimmed_for_upper_bound_ = true; + delete m; + imm_iters_[i] = nullptr; + } } } @@ -232,6 +261,9 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, const VersionStorageInfo* vstorage = sv_->current->storage_info(); const std::vector& l0 = vstorage->LevelFiles(0); for (uint32_t i = 0; i < l0.size(); ++i) { + if (!l0_iters_[i]) { + continue; + } if (seek_to_first) { l0_iters_[i]->SeekToFirst(); } else { @@ -239,6 +271,11 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, // won't go over this file. if (user_comparator_->Compare(user_key, l0[i]->largest.user_key()) > 0) { + if (read_options_.iterate_upper_bound != nullptr) { + has_iter_trimmed_for_upper_bound_ = true; + delete l0_iters_[i]; + l0_iters_[i] = nullptr; + } continue; } l0_iters_[i]->Seek(internal_key); @@ -247,7 +284,13 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, if (!l0_iters_[i]->status().ok()) { immutable_status_ = l0_iters_[i]->status(); } else if (l0_iters_[i]->Valid()) { - immutable_min_heap_.push(l0_iters_[i]); + if (!IsOverUpperBound(l0_iters_[i]->key())) { + immutable_min_heap_.push(l0_iters_[i]); + } else { + has_iter_trimmed_for_upper_bound_ = true; + delete l0_iters_[i]; + l0_iters_[i] = nullptr; + } } } @@ -261,7 +304,9 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, search_right_bound = FileIndexer::kLevelMaxIndex; continue; } - assert(level_iters_[level - 1] != nullptr); + if (level_iters_[level - 1] == nullptr) { + continue; + } uint32_t f_idx = 0; const auto& indexer = vstorage->file_indexer(); if (!seek_to_first) { @@ -319,7 +364,14 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, if (!level_iters_[level - 1]->status().ok()) { immutable_status_ = level_iters_[level - 1]->status(); } else if (level_iters_[level - 1]->Valid()) { - immutable_min_heap_.push(level_iters_[level - 1]); + if (!IsOverUpperBound(level_iters_[level - 1]->key())) { + immutable_min_heap_.push(level_iters_[level - 1]); + } else { + // Nothing in this level is interesting. Remove. + has_iter_trimmed_for_upper_bound_ = true; + delete level_iters_[level - 1]; + level_iters_[level - 1] = nullptr; + } } } } @@ -377,13 +429,19 @@ void ForwardIterator::Next() { if (current_ != mutable_iter_) { if (!current_->status().ok()) { immutable_status_ = current_->status(); - } else if (current_->Valid()) { + } else if ((current_->Valid()) && (!IsOverUpperBound(current_->key()))) { immutable_min_heap_.push(current_); - } else if ((!mutable_iter_->Valid()) && update_prev_key) { + } else { + if ((current_->Valid()) && (IsOverUpperBound(current_->key()))) { + // remove the current iterator + DeleteCurrentIter(); + current_ = nullptr; + } + if ((!mutable_iter_->Valid()) && update_prev_key) { mutable_iter_->Seek(prev_key_.GetKey()); + } } } - UpdateCurrent(); } @@ -421,6 +479,13 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { const auto& l0_files = vstorage->LevelFiles(0); l0_iters_.reserve(l0_files.size()); for (const auto* l0 : l0_files) { + if ((read_options_.iterate_upper_bound != nullptr) && + cfd_->internal_comparator().user_comparator()->Compare( + l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) { + has_iter_trimmed_for_upper_bound_ = true; + l0_iters_.push_back(nullptr); + continue; + } l0_iters_.push_back(cfd_->table_cache()->NewIterator( read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd)); } @@ -428,7 +493,11 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { for (int32_t level = 1; level < vstorage->num_levels(); ++level) { const auto& level_files = vstorage->LevelFiles(level); - if (level_files.empty()) { + if ((level_files.empty()) || + ((read_options_.iterate_upper_bound != nullptr) && + (user_comparator_->Compare(*read_options_.iterate_upper_bound, + level_files[0]->smallest.user_key()) < + 0))) { level_iters_.push_back(nullptr); } else { level_iters_.push_back( @@ -438,6 +507,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { current_ = nullptr; is_prev_set_ = false; + has_iter_trimmed_for_upper_bound_ = false; } void ForwardIterator::ResetIncompleteIterators() { @@ -488,6 +558,10 @@ void ForwardIterator::UpdateCurrent() { if (!status_.ok()) { status_ = Status::OK(); } + if (valid_ && IsOverUpperBound(current_->key())) { + valid_ = false; + current_ = nullptr; + } } bool ForwardIterator::NeedToSeekImmutable(const Slice& target) { @@ -523,6 +597,67 @@ bool ForwardIterator::NeedToSeekImmutable(const Slice& target) { return false; } +bool ForwardIterator::NeedToRebuildTrimmed(const Slice& target) { + if (!has_iter_trimmed_for_upper_bound_) { + return false; + } + if (!valid_ || !current_ || !is_prev_set_ || !immutable_status_.ok()) { + return true; + } + Slice prev_key = prev_key_.GetKey(); + if (prefix_extractor_ && + prefix_extractor_->Transform(target) + .compare(prefix_extractor_->Transform(prev_key)) != 0) { + return true; + } + if (cfd_->internal_comparator().InternalKeyComparator::Compare( + prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) { + return true; + } + return false; +} + +void ForwardIterator::DeleteCurrentIter() { + for (size_t i = 0; i < imm_iters_.size(); i++) { + auto& m = imm_iters_[i]; + if (!m) { + continue; + } + if (m == current_) { + has_iter_trimmed_for_upper_bound_ = true; + delete m; + m = nullptr; + return; + } + } + + const VersionStorageInfo* vstorage = sv_->current->storage_info(); + const std::vector& l0 = vstorage->LevelFiles(0); + for (uint32_t i = 0; i < l0.size(); ++i) { + if (!l0_iters_[i]) { + continue; + } + if (l0_iters_[i] == current_) { + has_iter_trimmed_for_upper_bound_ = true; + delete l0_iters_[i]; + l0_iters_[i] = nullptr; + return; + } + } + + for (int32_t level = 1; level < vstorage->num_levels(); ++level) { + const std::vector& level_files = vstorage->LevelFiles(level); + if (level_iters_[level - 1] == nullptr) { + continue; + } + if (level_iters_[level - 1] == current_) { + has_iter_trimmed_for_upper_bound_ = true; + delete level_iters_[level - 1]; + level_iters_[level - 1] = nullptr; + } + } +} + uint32_t ForwardIterator::FindFileInRange( const std::vector& files, const Slice& internal_key, uint32_t left, uint32_t right) { diff --git a/db/forward_iterator.h b/db/forward_iterator.h index f72c9cb23..c3d8dae9f 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -78,10 +78,14 @@ class ForwardIterator : public Iterator { void SeekInternal(const Slice& internal_key, bool seek_to_first); void UpdateCurrent(); bool NeedToSeekImmutable(const Slice& internal_key); + bool NeedToRebuildTrimmed(const Slice& internal_key); + void DeleteCurrentIter(); uint32_t FindFileInRange( const std::vector& files, const Slice& internal_key, uint32_t left, uint32_t right); + bool IsOverUpperBound(const Slice& internal_key) const; + DBImpl* const db_; const ReadOptions read_options_; ColumnFamilyData* const cfd_; @@ -99,6 +103,9 @@ class ForwardIterator : public Iterator { Status status_; Status immutable_status_; bool valid_; + bool has_iter_trimmed_for_upper_bound_; + bool has_iter_filtered_by_range_; + Slice smallest_file_key_bound; IterKey prev_key_; bool is_prev_set_;