Provide support for async_io with tailing iterators (#10781)

Summary:
Provide support for async_io if ReadOptions.tailing is set true.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10781

Test Plan:
- Update unit tests
- Ran db_bench: ./db_bench --benchmarks="readrandom" --use_existing_db --use_tailing_iterator=1 --async_io=1

Reviewed By: anand1976

Differential Revision: D40128882

Pulled By: anand1976

fbshipit-source-id: 55e17855536871a5c47e2de92d238ae005c32d01
main
akankshamahajan 2 years ago committed by Facebook GitHub Bot
parent 5182bf3f83
commit ebf8c454fd
  1. 1
      HISTORY.md
  2. 81
      db/db_tailing_iter_test.cc
  3. 128
      db/forward_iterator.cc
  4. 4
      db/forward_iterator.h

@ -2,6 +2,7 @@
## Unreleased
### New Features
* `DeleteRange()` now supports user-defined timestamp.
* Provide support for async_io with tailing iterators when ReadOptions.tailing is enabled during scans.
### Bug Fixes
* Fix a bug in io_uring_prep_cancel in AbortIO API for posix which expects sqe->addr to match with read request submitted and wrong paramter was being passed.

@ -18,15 +18,22 @@
namespace ROCKSDB_NAMESPACE {
class DBTestTailingIterator : public DBTestBase {
class DBTestTailingIterator : public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
DBTestTailingIterator()
: DBTestBase("db_tailing_iterator_test", /*env_do_fsync=*/true) {}
};
TEST_F(DBTestTailingIterator, TailingIteratorSingle) {
INSTANTIATE_TEST_CASE_P(DBTestTailingIterator, DBTestTailingIterator,
::testing::Bool());
TEST_P(DBTestTailingIterator, TailingIteratorSingle) {
ReadOptions read_options;
read_options.tailing = true;
if (GetParam()) {
read_options.async_io = true;
}
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
iter->SeekToFirst();
@ -43,11 +50,13 @@ TEST_F(DBTestTailingIterator, TailingIteratorSingle) {
ASSERT_TRUE(!iter->Valid());
}
TEST_F(DBTestTailingIterator, TailingIteratorKeepAdding) {
TEST_P(DBTestTailingIterator, TailingIteratorKeepAdding) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ReadOptions read_options;
read_options.tailing = true;
if (GetParam()) {
read_options.async_io = true;
}
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
ASSERT_OK(iter->status());
std::string value(1024, 'a');
@ -66,11 +75,13 @@ TEST_F(DBTestTailingIterator, TailingIteratorKeepAdding) {
}
}
TEST_F(DBTestTailingIterator, TailingIteratorSeekToNext) {
TEST_P(DBTestTailingIterator, TailingIteratorSeekToNext) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ReadOptions read_options;
read_options.tailing = true;
if (GetParam()) {
read_options.async_io = true;
}
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
ASSERT_OK(iter->status());
std::unique_ptr<Iterator> itern(db_->NewIterator(read_options, handles_[1]));
@ -125,7 +136,7 @@ TEST_F(DBTestTailingIterator, TailingIteratorSeekToNext) {
}
}
TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) {
TEST_P(DBTestTailingIterator, TailingIteratorTrimSeekToNext) {
const uint64_t k150KB = 150 * 1024;
Options options;
options.write_buffer_size = k150KB;
@ -135,6 +146,9 @@ TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) {
CreateAndReopenWithCF({"pikachu"}, options);
ReadOptions read_options;
read_options.tailing = true;
if (GetParam()) {
read_options.async_io = true;
}
int num_iters, deleted_iters;
char bufe[32];
@ -265,10 +279,13 @@ TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) {
}
}
TEST_F(DBTestTailingIterator, TailingIteratorDeletes) {
TEST_P(DBTestTailingIterator, TailingIteratorDeletes) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ReadOptions read_options;
read_options.tailing = true;
if (GetParam()) {
read_options.async_io = true;
}
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
ASSERT_OK(iter->status());
@ -300,15 +317,18 @@ TEST_F(DBTestTailingIterator, TailingIteratorDeletes) {
// make sure we can read all new records using the existing iterator
int count = 0;
for (; iter->Valid(); iter->Next(), ++count) ;
for (; iter->Valid(); iter->Next(), ++count)
;
ASSERT_EQ(count, num_records);
}
TEST_F(DBTestTailingIterator, TailingIteratorPrefixSeek) {
TEST_P(DBTestTailingIterator, TailingIteratorPrefixSeek) {
ReadOptions read_options;
read_options.tailing = true;
if (GetParam()) {
read_options.async_io = true;
}
Options options = CurrentOptions();
options.create_if_missing = true;
options.disable_auto_compactions = true;
@ -338,10 +358,13 @@ TEST_F(DBTestTailingIterator, TailingIteratorPrefixSeek) {
ASSERT_TRUE(!iter->Valid());
}
TEST_F(DBTestTailingIterator, TailingIteratorIncomplete) {
TEST_P(DBTestTailingIterator, TailingIteratorIncomplete) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ReadOptions read_options;
read_options.tailing = true;
if (GetParam()) {
read_options.async_io = true;
}
read_options.read_tier = kBlockCacheTier;
std::string key("key");
@ -361,7 +384,7 @@ TEST_F(DBTestTailingIterator, TailingIteratorIncomplete) {
ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
}
TEST_F(DBTestTailingIterator, TailingIteratorSeekToSame) {
TEST_P(DBTestTailingIterator, TailingIteratorSeekToSame) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 1000;
@ -369,7 +392,9 @@ TEST_F(DBTestTailingIterator, TailingIteratorSeekToSame) {
ReadOptions read_options;
read_options.tailing = true;
if (GetParam()) {
read_options.async_io = true;
}
const int NROWS = 10000;
// Write rows with keys 00000, 00002, 00004 etc.
for (int i = 0; i < NROWS; ++i) {
@ -400,14 +425,16 @@ TEST_F(DBTestTailingIterator, TailingIteratorSeekToSame) {
// Sets iterate_upper_bound and verifies that ForwardIterator doesn't call
// Seek() on immutable iterators when target key is >= prev_key and all
// iterators, including the memtable iterator, are over the upper bound.
TEST_F(DBTestTailingIterator, TailingIteratorUpperBound) {
TEST_P(DBTestTailingIterator, TailingIteratorUpperBound) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
const Slice upper_bound("20", 3);
ReadOptions read_options;
read_options.tailing = true;
read_options.iterate_upper_bound = &upper_bound;
if (GetParam()) {
read_options.async_io = true;
}
ASSERT_OK(Put(1, "11", "11"));
ASSERT_OK(Put(1, "12", "12"));
ASSERT_OK(Put(1, "22", "22"));
@ -439,10 +466,14 @@ TEST_F(DBTestTailingIterator, TailingIteratorUpperBound) {
ASSERT_FALSE(it->Valid());
ASSERT_OK(it->status());
ASSERT_EQ(0, immutable_seeks);
if (GetParam()) {
ASSERT_EQ(1, immutable_seeks);
} else {
ASSERT_EQ(0, immutable_seeks);
}
}
TEST_F(DBTestTailingIterator, TailingIteratorGap) {
TEST_P(DBTestTailingIterator, TailingIteratorGap) {
// level 1: [20, 25] [35, 40]
// level 2: [10 - 15] [45 - 50]
// level 3: [20, 30, 40]
@ -455,7 +486,9 @@ TEST_F(DBTestTailingIterator, TailingIteratorGap) {
ReadOptions read_options;
read_options.tailing = true;
if (GetParam()) {
read_options.async_io = true;
}
ASSERT_OK(Put(1, "20", "20"));
ASSERT_OK(Put(1, "30", "30"));
ASSERT_OK(Put(1, "40", "40"));
@ -497,9 +530,12 @@ TEST_F(DBTestTailingIterator, TailingIteratorGap) {
ASSERT_OK(it->status());
}
TEST_F(DBTestTailingIterator, SeekWithUpperBoundBug) {
TEST_P(DBTestTailingIterator, SeekWithUpperBoundBug) {
ReadOptions read_options;
read_options.tailing = true;
if (GetParam()) {
read_options.async_io = true;
}
const Slice upper_bound("cc", 3);
read_options.iterate_upper_bound = &upper_bound;
@ -520,9 +556,12 @@ TEST_F(DBTestTailingIterator, SeekWithUpperBoundBug) {
ASSERT_EQ(iter->key().ToString(), "aa");
}
TEST_F(DBTestTailingIterator, SeekToFirstWithUpperBoundBug) {
TEST_P(DBTestTailingIterator, SeekToFirstWithUpperBoundBug) {
ReadOptions read_options;
read_options.tailing = true;
if (GetParam()) {
read_options.async_io = true;
}
const Slice upper_bound("cc", 3);
read_options.iterate_upper_bound = &upper_bound;

@ -351,7 +351,7 @@ void ForwardIterator::SeekToFirst() {
} else if (immutable_status_.IsIncomplete()) {
ResetIncompleteIterators();
}
SeekInternal(Slice(), true);
SeekInternal(Slice(), true, false);
}
bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const {
@ -369,48 +369,60 @@ void ForwardIterator::Seek(const Slice& internal_key) {
} else if (immutable_status_.IsIncomplete()) {
ResetIncompleteIterators();
}
SeekInternal(internal_key, false);
SeekInternal(internal_key, false, false);
if (read_options_.async_io) {
SeekInternal(internal_key, false, true);
}
}
// In case of async_io, SeekInternal is called twice with seek_after_async_io
// enabled in second call which only does seeking part to retrieve the blocks.
void ForwardIterator::SeekInternal(const Slice& internal_key,
bool seek_to_first) {
bool seek_to_first,
bool seek_after_async_io) {
assert(mutable_iter_);
// mutable
seek_to_first ? mutable_iter_->SeekToFirst() :
mutable_iter_->Seek(internal_key);
if (!seek_after_async_io) {
seek_to_first ? mutable_iter_->SeekToFirst()
: mutable_iter_->Seek(internal_key);
}
// immutable
// TODO(ljin): NeedToSeekImmutable has negative impact on performance
// if it turns to need to seek immutable often. We probably want to have
// an option to turn it off.
if (seek_to_first || NeedToSeekImmutable(internal_key)) {
immutable_status_ = Status::OK();
if (has_iter_trimmed_for_upper_bound_ &&
(
// prev_ is not set yet
is_prev_set_ == false ||
// We are doing SeekToFirst() and internal_key.size() = 0
seek_to_first ||
// prev_key_ > internal_key
cfd_->internal_comparator().InternalKeyComparator::Compare(
prev_key_.GetInternalKey(), internal_key) > 0)) {
// Some iterators are trimmed. Need to rebuild.
RebuildIterators(true);
// Already seeked mutable iter, so seek again
seek_to_first ? mutable_iter_->SeekToFirst()
: mutable_iter_->Seek(internal_key);
}
{
auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
immutable_min_heap_.swap(tmp);
}
for (size_t i = 0; i < imm_iters_.size(); i++) {
auto* m = imm_iters_[i];
seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
if (!m->status().ok()) {
immutable_status_ = m->status();
} else if (m->Valid()) {
immutable_min_heap_.push(m);
if (seek_to_first || seek_after_async_io ||
NeedToSeekImmutable(internal_key)) {
if (!seek_after_async_io) {
immutable_status_ = Status::OK();
if (has_iter_trimmed_for_upper_bound_ &&
(
// prev_ is not set yet
is_prev_set_ == false ||
// We are doing SeekToFirst() and internal_key.size() = 0
seek_to_first ||
// prev_key_ > internal_key
cfd_->internal_comparator().InternalKeyComparator::Compare(
prev_key_.GetInternalKey(), internal_key) > 0)) {
// Some iterators are trimmed. Need to rebuild.
RebuildIterators(true);
// Already seeked mutable iter, so seek again
seek_to_first ? mutable_iter_->SeekToFirst()
: mutable_iter_->Seek(internal_key);
}
{
auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
immutable_min_heap_.swap(tmp);
}
for (size_t i = 0; i < imm_iters_.size(); i++) {
auto* m = imm_iters_[i];
seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
if (!m->status().ok()) {
immutable_status_ = m->status();
} else if (m->Valid()) {
immutable_min_heap_.push(m);
}
}
}
@ -424,12 +436,19 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
if (!l0_iters_[i]) {
continue;
}
if (seek_after_async_io) {
if (!l0_iters_[i]->status().IsTryAgain()) {
continue;
}
}
if (seek_to_first) {
l0_iters_[i]->SeekToFirst();
} else {
// If the target key passes over the largest key, we are sure Next()
// won't go over this file.
if (user_comparator_->Compare(target_user_key,
if (seek_after_async_io == false &&
user_comparator_->Compare(target_user_key,
l0[i]->largest.user_key()) > 0) {
if (read_options_.iterate_upper_bound != nullptr) {
has_iter_trimmed_for_upper_bound_ = true;
@ -441,7 +460,10 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
l0_iters_[i]->Seek(internal_key);
}
if (!l0_iters_[i]->status().ok()) {
if (l0_iters_[i]->status().IsTryAgain()) {
assert(!seek_after_async_io);
continue;
} else if (!l0_iters_[i]->status().ok()) {
immutable_status_ = l0_iters_[i]->status();
} else if (l0_iters_[i]->Valid() &&
!IsOverUpperBound(l0_iters_[i]->key())) {
@ -462,19 +484,30 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
if (level_iters_[level - 1] == nullptr) {
continue;
}
if (seek_after_async_io) {
if (!level_iters_[level - 1]->status().IsTryAgain()) {
continue;
}
}
uint32_t f_idx = 0;
if (!seek_to_first) {
if (!seek_to_first && !seek_after_async_io) {
f_idx = FindFileInRange(level_files, internal_key, 0,
static_cast<uint32_t>(level_files.size()));
}
// Seek
if (f_idx < level_files.size()) {
level_iters_[level - 1]->SetFileIndex(f_idx);
seek_to_first ? level_iters_[level - 1]->SeekToFirst() :
level_iters_[level - 1]->Seek(internal_key);
if (seek_after_async_io || f_idx < level_files.size()) {
if (!seek_after_async_io) {
level_iters_[level - 1]->SetFileIndex(f_idx);
}
seek_to_first ? level_iters_[level - 1]->SeekToFirst()
: level_iters_[level - 1]->Seek(internal_key);
if (!level_iters_[level - 1]->status().ok()) {
if (level_iters_[level - 1]->status().IsTryAgain()) {
assert(!seek_after_async_io);
continue;
} else if (!level_iters_[level - 1]->status().ok()) {
immutable_status_ = level_iters_[level - 1]->status();
} else if (level_iters_[level - 1]->Valid() &&
!IsOverUpperBound(level_iters_[level - 1]->key())) {
@ -502,7 +535,11 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
immutable_min_heap_.push(current_);
}
UpdateCurrent();
// For async_io, it should be updated when seek_after_async_io is true (in
// second call).
if (seek_to_first || !read_options_.async_io || seek_after_async_io) {
UpdateCurrent();
}
TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this);
}
@ -520,7 +557,12 @@ void ForwardIterator::Next() {
} else {
RenewIterators();
}
SeekInternal(old_key, false);
SeekInternal(old_key, false, false);
if (read_options_.async_io) {
SeekInternal(old_key, false, true);
}
if (!valid_ || key().compare(old_key) != 0) {
return;
}

@ -101,7 +101,9 @@ class ForwardIterator : public InternalIterator {
void BuildLevelIterators(const VersionStorageInfo* vstorage,
SuperVersion* sv);
void ResetIncompleteIterators();
void SeekInternal(const Slice& internal_key, bool seek_to_first);
void SeekInternal(const Slice& internal_key, bool seek_to_first,
bool seek_after_async_io);
void UpdateCurrent();
bool NeedToSeekImmutable(const Slice& internal_key);
void DeleteCurrentIter();

Loading…
Cancel
Save