From ebf8c454fd3e763d6b36a277d36dc8a6d243629b Mon Sep 17 00:00:00 2001 From: akankshamahajan Date: Mon, 10 Oct 2022 15:48:48 -0700 Subject: [PATCH] Provide support for async_io with tailing iterators (#10781) Summary: Provide support for async_io if ReadOptions.tailing is set true. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10781 Test Plan: - Update unit tests - Ran db_bench: ./db_bench --benchmarks="readrandom" --use_existing_db --use_tailing_iterator=1 --async_io=1 Reviewed By: anand1976 Differential Revision: D40128882 Pulled By: anand1976 fbshipit-source-id: 55e17855536871a5c47e2de92d238ae005c32d01 --- HISTORY.md | 1 + db/db_tailing_iter_test.cc | 81 +++++++++++++++++------ db/forward_iterator.cc | 128 ++++++++++++++++++++++++------------- db/forward_iterator.h | 4 +- 4 files changed, 149 insertions(+), 65 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index a01f12fa3..d61580c2b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### New Features * `DeleteRange()` now supports user-defined timestamp. +* Provide support for async_io with tailing iterators when ReadOptions.tailing is enabled during scans. ### Bug Fixes * Fix a bug in io_uring_prep_cancel in AbortIO API for posix which expects sqe->addr to match with read request submitted and wrong paramter was being passed. diff --git a/db/db_tailing_iter_test.cc b/db/db_tailing_iter_test.cc index c92628ad1..16aeee9eb 100644 --- a/db/db_tailing_iter_test.cc +++ b/db/db_tailing_iter_test.cc @@ -18,15 +18,22 @@ namespace ROCKSDB_NAMESPACE { -class DBTestTailingIterator : public DBTestBase { +class DBTestTailingIterator : public DBTestBase, + public ::testing::WithParamInterface { public: DBTestTailingIterator() : DBTestBase("db_tailing_iterator_test", /*env_do_fsync=*/true) {} }; -TEST_F(DBTestTailingIterator, TailingIteratorSingle) { +INSTANTIATE_TEST_CASE_P(DBTestTailingIterator, DBTestTailingIterator, + ::testing::Bool()); + +TEST_P(DBTestTailingIterator, TailingIteratorSingle) { ReadOptions read_options; read_options.tailing = true; + if (GetParam()) { + read_options.async_io = true; + } std::unique_ptr iter(db_->NewIterator(read_options)); iter->SeekToFirst(); @@ -43,11 +50,13 @@ TEST_F(DBTestTailingIterator, TailingIteratorSingle) { ASSERT_TRUE(!iter->Valid()); } -TEST_F(DBTestTailingIterator, TailingIteratorKeepAdding) { +TEST_P(DBTestTailingIterator, TailingIteratorKeepAdding) { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ReadOptions read_options; read_options.tailing = true; - + if (GetParam()) { + read_options.async_io = true; + } std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); ASSERT_OK(iter->status()); std::string value(1024, 'a'); @@ -66,11 +75,13 @@ TEST_F(DBTestTailingIterator, TailingIteratorKeepAdding) { } } -TEST_F(DBTestTailingIterator, TailingIteratorSeekToNext) { +TEST_P(DBTestTailingIterator, TailingIteratorSeekToNext) { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ReadOptions read_options; read_options.tailing = true; - + if (GetParam()) { + read_options.async_io = true; + } std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); ASSERT_OK(iter->status()); std::unique_ptr itern(db_->NewIterator(read_options, handles_[1])); @@ -125,7 +136,7 @@ TEST_F(DBTestTailingIterator, TailingIteratorSeekToNext) { } } -TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) { +TEST_P(DBTestTailingIterator, TailingIteratorTrimSeekToNext) { const uint64_t k150KB = 150 * 1024; Options options; options.write_buffer_size = k150KB; @@ -135,6 +146,9 @@ TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) { CreateAndReopenWithCF({"pikachu"}, options); ReadOptions read_options; read_options.tailing = true; + if (GetParam()) { + read_options.async_io = true; + } int num_iters, deleted_iters; char bufe[32]; @@ -265,10 +279,13 @@ TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) { } } -TEST_F(DBTestTailingIterator, TailingIteratorDeletes) { +TEST_P(DBTestTailingIterator, TailingIteratorDeletes) { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ReadOptions read_options; read_options.tailing = true; + if (GetParam()) { + read_options.async_io = true; + } std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); ASSERT_OK(iter->status()); @@ -300,15 +317,18 @@ TEST_F(DBTestTailingIterator, TailingIteratorDeletes) { // make sure we can read all new records using the existing iterator int count = 0; - for (; iter->Valid(); iter->Next(), ++count) ; + for (; iter->Valid(); iter->Next(), ++count) + ; ASSERT_EQ(count, num_records); } -TEST_F(DBTestTailingIterator, TailingIteratorPrefixSeek) { +TEST_P(DBTestTailingIterator, TailingIteratorPrefixSeek) { ReadOptions read_options; read_options.tailing = true; - + if (GetParam()) { + read_options.async_io = true; + } Options options = CurrentOptions(); options.create_if_missing = true; options.disable_auto_compactions = true; @@ -338,10 +358,13 @@ TEST_F(DBTestTailingIterator, TailingIteratorPrefixSeek) { ASSERT_TRUE(!iter->Valid()); } -TEST_F(DBTestTailingIterator, TailingIteratorIncomplete) { +TEST_P(DBTestTailingIterator, TailingIteratorIncomplete) { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ReadOptions read_options; read_options.tailing = true; + if (GetParam()) { + read_options.async_io = true; + } read_options.read_tier = kBlockCacheTier; std::string key("key"); @@ -361,7 +384,7 @@ TEST_F(DBTestTailingIterator, TailingIteratorIncomplete) { ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); } -TEST_F(DBTestTailingIterator, TailingIteratorSeekToSame) { +TEST_P(DBTestTailingIterator, TailingIteratorSeekToSame) { Options options = CurrentOptions(); options.compaction_style = kCompactionStyleUniversal; options.write_buffer_size = 1000; @@ -369,7 +392,9 @@ TEST_F(DBTestTailingIterator, TailingIteratorSeekToSame) { ReadOptions read_options; read_options.tailing = true; - + if (GetParam()) { + read_options.async_io = true; + } const int NROWS = 10000; // Write rows with keys 00000, 00002, 00004 etc. for (int i = 0; i < NROWS; ++i) { @@ -400,14 +425,16 @@ TEST_F(DBTestTailingIterator, TailingIteratorSeekToSame) { // Sets iterate_upper_bound and verifies that ForwardIterator doesn't call // Seek() on immutable iterators when target key is >= prev_key and all // iterators, including the memtable iterator, are over the upper bound. -TEST_F(DBTestTailingIterator, TailingIteratorUpperBound) { +TEST_P(DBTestTailingIterator, TailingIteratorUpperBound) { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); const Slice upper_bound("20", 3); ReadOptions read_options; read_options.tailing = true; read_options.iterate_upper_bound = &upper_bound; - + if (GetParam()) { + read_options.async_io = true; + } ASSERT_OK(Put(1, "11", "11")); ASSERT_OK(Put(1, "12", "12")); ASSERT_OK(Put(1, "22", "22")); @@ -439,10 +466,14 @@ TEST_F(DBTestTailingIterator, TailingIteratorUpperBound) { ASSERT_FALSE(it->Valid()); ASSERT_OK(it->status()); - ASSERT_EQ(0, immutable_seeks); + if (GetParam()) { + ASSERT_EQ(1, immutable_seeks); + } else { + ASSERT_EQ(0, immutable_seeks); + } } -TEST_F(DBTestTailingIterator, TailingIteratorGap) { +TEST_P(DBTestTailingIterator, TailingIteratorGap) { // level 1: [20, 25] [35, 40] // level 2: [10 - 15] [45 - 50] // level 3: [20, 30, 40] @@ -455,7 +486,9 @@ TEST_F(DBTestTailingIterator, TailingIteratorGap) { ReadOptions read_options; read_options.tailing = true; - + if (GetParam()) { + read_options.async_io = true; + } ASSERT_OK(Put(1, "20", "20")); ASSERT_OK(Put(1, "30", "30")); ASSERT_OK(Put(1, "40", "40")); @@ -497,9 +530,12 @@ TEST_F(DBTestTailingIterator, TailingIteratorGap) { ASSERT_OK(it->status()); } -TEST_F(DBTestTailingIterator, SeekWithUpperBoundBug) { +TEST_P(DBTestTailingIterator, SeekWithUpperBoundBug) { ReadOptions read_options; read_options.tailing = true; + if (GetParam()) { + read_options.async_io = true; + } const Slice upper_bound("cc", 3); read_options.iterate_upper_bound = &upper_bound; @@ -520,9 +556,12 @@ TEST_F(DBTestTailingIterator, SeekWithUpperBoundBug) { ASSERT_EQ(iter->key().ToString(), "aa"); } -TEST_F(DBTestTailingIterator, SeekToFirstWithUpperBoundBug) { +TEST_P(DBTestTailingIterator, SeekToFirstWithUpperBoundBug) { ReadOptions read_options; read_options.tailing = true; + if (GetParam()) { + read_options.async_io = true; + } const Slice upper_bound("cc", 3); read_options.iterate_upper_bound = &upper_bound; diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 29ae1d0ad..13a94cb81 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -351,7 +351,7 @@ void ForwardIterator::SeekToFirst() { } else if (immutable_status_.IsIncomplete()) { ResetIncompleteIterators(); } - SeekInternal(Slice(), true); + SeekInternal(Slice(), true, false); } bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const { @@ -369,48 +369,60 @@ void ForwardIterator::Seek(const Slice& internal_key) { } else if (immutable_status_.IsIncomplete()) { ResetIncompleteIterators(); } - SeekInternal(internal_key, false); + + SeekInternal(internal_key, false, false); + if (read_options_.async_io) { + SeekInternal(internal_key, false, true); + } } +// In case of async_io, SeekInternal is called twice with seek_after_async_io +// enabled in second call which only does seeking part to retrieve the blocks. void ForwardIterator::SeekInternal(const Slice& internal_key, - bool seek_to_first) { + bool seek_to_first, + bool seek_after_async_io) { assert(mutable_iter_); // mutable - seek_to_first ? mutable_iter_->SeekToFirst() : - mutable_iter_->Seek(internal_key); + if (!seek_after_async_io) { + seek_to_first ? mutable_iter_->SeekToFirst() + : mutable_iter_->Seek(internal_key); + } // immutable // TODO(ljin): NeedToSeekImmutable has negative impact on performance // if it turns to need to seek immutable often. We probably want to have // an option to turn it off. - if (seek_to_first || NeedToSeekImmutable(internal_key)) { - immutable_status_ = Status::OK(); - if (has_iter_trimmed_for_upper_bound_ && - ( - // prev_ is not set yet - is_prev_set_ == false || - // We are doing SeekToFirst() and internal_key.size() = 0 - seek_to_first || - // prev_key_ > internal_key - cfd_->internal_comparator().InternalKeyComparator::Compare( - prev_key_.GetInternalKey(), internal_key) > 0)) { - // Some iterators are trimmed. Need to rebuild. - RebuildIterators(true); - // Already seeked mutable iter, so seek again - seek_to_first ? mutable_iter_->SeekToFirst() - : mutable_iter_->Seek(internal_key); - } - { - auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator())); - immutable_min_heap_.swap(tmp); - } - for (size_t i = 0; i < imm_iters_.size(); i++) { - auto* m = imm_iters_[i]; - seek_to_first ? m->SeekToFirst() : m->Seek(internal_key); - if (!m->status().ok()) { - immutable_status_ = m->status(); - } else if (m->Valid()) { - immutable_min_heap_.push(m); + if (seek_to_first || seek_after_async_io || + NeedToSeekImmutable(internal_key)) { + if (!seek_after_async_io) { + immutable_status_ = Status::OK(); + if (has_iter_trimmed_for_upper_bound_ && + ( + // prev_ is not set yet + is_prev_set_ == false || + // We are doing SeekToFirst() and internal_key.size() = 0 + seek_to_first || + // prev_key_ > internal_key + cfd_->internal_comparator().InternalKeyComparator::Compare( + prev_key_.GetInternalKey(), internal_key) > 0)) { + // Some iterators are trimmed. Need to rebuild. + RebuildIterators(true); + // Already seeked mutable iter, so seek again + seek_to_first ? mutable_iter_->SeekToFirst() + : mutable_iter_->Seek(internal_key); + } + { + auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator())); + immutable_min_heap_.swap(tmp); + } + for (size_t i = 0; i < imm_iters_.size(); i++) { + auto* m = imm_iters_[i]; + seek_to_first ? m->SeekToFirst() : m->Seek(internal_key); + if (!m->status().ok()) { + immutable_status_ = m->status(); + } else if (m->Valid()) { + immutable_min_heap_.push(m); + } } } @@ -424,12 +436,19 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, if (!l0_iters_[i]) { continue; } + if (seek_after_async_io) { + if (!l0_iters_[i]->status().IsTryAgain()) { + continue; + } + } + if (seek_to_first) { l0_iters_[i]->SeekToFirst(); } else { // If the target key passes over the largest key, we are sure Next() // won't go over this file. - if (user_comparator_->Compare(target_user_key, + if (seek_after_async_io == false && + user_comparator_->Compare(target_user_key, l0[i]->largest.user_key()) > 0) { if (read_options_.iterate_upper_bound != nullptr) { has_iter_trimmed_for_upper_bound_ = true; @@ -441,7 +460,10 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, l0_iters_[i]->Seek(internal_key); } - if (!l0_iters_[i]->status().ok()) { + if (l0_iters_[i]->status().IsTryAgain()) { + assert(!seek_after_async_io); + continue; + } else if (!l0_iters_[i]->status().ok()) { immutable_status_ = l0_iters_[i]->status(); } else if (l0_iters_[i]->Valid() && !IsOverUpperBound(l0_iters_[i]->key())) { @@ -462,19 +484,30 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, if (level_iters_[level - 1] == nullptr) { continue; } + + if (seek_after_async_io) { + if (!level_iters_[level - 1]->status().IsTryAgain()) { + continue; + } + } uint32_t f_idx = 0; - if (!seek_to_first) { + if (!seek_to_first && !seek_after_async_io) { f_idx = FindFileInRange(level_files, internal_key, 0, static_cast(level_files.size())); } // Seek - if (f_idx < level_files.size()) { - level_iters_[level - 1]->SetFileIndex(f_idx); - seek_to_first ? level_iters_[level - 1]->SeekToFirst() : - level_iters_[level - 1]->Seek(internal_key); + if (seek_after_async_io || f_idx < level_files.size()) { + if (!seek_after_async_io) { + level_iters_[level - 1]->SetFileIndex(f_idx); + } + seek_to_first ? level_iters_[level - 1]->SeekToFirst() + : level_iters_[level - 1]->Seek(internal_key); - if (!level_iters_[level - 1]->status().ok()) { + if (level_iters_[level - 1]->status().IsTryAgain()) { + assert(!seek_after_async_io); + continue; + } else if (!level_iters_[level - 1]->status().ok()) { immutable_status_ = level_iters_[level - 1]->status(); } else if (level_iters_[level - 1]->Valid() && !IsOverUpperBound(level_iters_[level - 1]->key())) { @@ -502,7 +535,11 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, immutable_min_heap_.push(current_); } - UpdateCurrent(); + // For async_io, it should be updated when seek_after_async_io is true (in + // second call). + if (seek_to_first || !read_options_.async_io || seek_after_async_io) { + UpdateCurrent(); + } TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this); } @@ -520,7 +557,12 @@ void ForwardIterator::Next() { } else { RenewIterators(); } - SeekInternal(old_key, false); + + SeekInternal(old_key, false, false); + if (read_options_.async_io) { + SeekInternal(old_key, false, true); + } + if (!valid_ || key().compare(old_key) != 0) { return; } diff --git a/db/forward_iterator.h b/db/forward_iterator.h index 00823cd45..21cbd7001 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -101,7 +101,9 @@ class ForwardIterator : public InternalIterator { void BuildLevelIterators(const VersionStorageInfo* vstorage, SuperVersion* sv); void ResetIncompleteIterators(); - void SeekInternal(const Slice& internal_key, bool seek_to_first); + void SeekInternal(const Slice& internal_key, bool seek_to_first, + bool seek_after_async_io); + void UpdateCurrent(); bool NeedToSeekImmutable(const Slice& internal_key); void DeleteCurrentIter();