From 8c392a31d7a4ab4e1f875a69e4205d94018fdde3 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Mon, 9 Oct 2017 17:05:34 -0700 Subject: [PATCH] WritePrepared Txn: Iterator Summary: On iterator create, take a snapshot, create a ReadCallback and pass the ReadCallback to the underlying DBIter to check if key is committed. Closes https://github.com/facebook/rocksdb/pull/2981 Differential Revision: D6001471 Pulled By: yiwu-arbug fbshipit-source-id: 3565c4cdaf25370ba47008b0e0cb65b31dfe79fe --- db/db_blob_index_test.cc | 6 +- db/db_impl.cc | 37 +- db/db_impl.h | 1 + db/db_impl_readonly.cc | 6 +- db/db_iter.cc | 47 ++- db/db_iter.h | 12 +- db/db_iter_test.cc | 377 +++++++++--------- utilities/date_tiered/date_tiered_db_impl.cc | 3 +- .../pessimistic_transaction_db.cc | 62 +++ .../transactions/pessimistic_transaction_db.h | 14 + .../write_prepared_transaction_test.cc | 55 +++ utilities/transactions/write_prepared_txn.cc | 17 + utilities/transactions/write_prepared_txn.h | 5 + 13 files changed, 417 insertions(+), 225 deletions(-) diff --git a/db/db_blob_index_test.cc b/db/db_blob_index_test.cc index 9d5f07411..43aa3a534 100644 --- a/db/db_blob_index_test.cc +++ b/db/db_blob_index_test.cc @@ -89,9 +89,9 @@ class DBBlobIndexTest : public DBTestBase { } ArenaWrappedDBIter* GetBlobIterator() { - return dbfull()->NewIteratorImpl(ReadOptions(), cfd(), - dbfull()->GetLatestSequenceNumber(), - true /*allow_blob*/); + return dbfull()->NewIteratorImpl( + ReadOptions(), cfd(), dbfull()->GetLatestSequenceNumber(), + nullptr /*read_callback*/, true /*allow_blob*/); } Options GetTestOptions() { diff --git a/db/db_impl.cc b/db/db_impl.cc index 0b6a0bcab..cd727b6c3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1413,6 +1413,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, } auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); + ReadCallback* read_callback = nullptr; // No read callback provided. if (read_options.managed) { #ifdef ROCKSDB_LITE // not supported in lite version @@ -1437,16 +1438,14 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, return NewDBIterator( env_, read_options, *cfd->ioptions(), cfd->user_comparator(), iter, kMaxSequenceNumber, - sv->mutable_cf_options.max_sequential_skip_in_iterations); + sv->mutable_cf_options.max_sequential_skip_in_iterations, + read_callback); #endif } else { - SequenceNumber latest_snapshot = versions_->LastSequence(); - auto snapshot = - read_options.snapshot != nullptr - ? reinterpret_cast(read_options.snapshot) - ->number_ - : latest_snapshot; - return NewIteratorImpl(read_options, cfd, snapshot); + auto snapshot = read_options.snapshot != nullptr + ? read_options.snapshot->GetSequenceNumber() + : versions_->LastSequence(); + return NewIteratorImpl(read_options, cfd, snapshot, read_callback); } // To stop compiler from complaining return nullptr; @@ -1455,6 +1454,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, ColumnFamilyData* cfd, SequenceNumber snapshot, + ReadCallback* read_callback, bool allow_blob) { SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); @@ -1503,8 +1503,8 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( env_, read_options, *cfd->ioptions(), snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number, ((read_options.snapshot != nullptr) ? nullptr : this), - cfd, allow_blob); + sv->version_number, read_callback, + ((read_options.snapshot != nullptr) ? nullptr : this), cfd, allow_blob); InternalIterator* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), @@ -1522,6 +1522,7 @@ Status DBImpl::NewIterators( return Status::NotSupported( "ReadTier::kPersistedData is not yet supported in iterators."); } + ReadCallback* read_callback = nullptr; // No read callback provided. iterators->clear(); iterators->reserve(column_families.size()); if (read_options.managed) { @@ -1552,21 +1553,19 @@ Status DBImpl::NewIterators( iterators->push_back(NewDBIterator( env_, read_options, *cfd->ioptions(), cfd->user_comparator(), iter, kMaxSequenceNumber, - sv->mutable_cf_options.max_sequential_skip_in_iterations)); + sv->mutable_cf_options.max_sequential_skip_in_iterations, + read_callback)); } #endif } else { - SequenceNumber latest_snapshot = versions_->LastSequence(); - auto snapshot = - read_options.snapshot != nullptr - ? reinterpret_cast(read_options.snapshot) - ->number_ - : latest_snapshot; - + auto snapshot = read_options.snapshot != nullptr + ? read_options.snapshot->GetSequenceNumber() + : versions_->LastSequence(); for (size_t i = 0; i < column_families.size(); ++i) { auto* cfd = reinterpret_cast( column_families[i])->cfd(); - iterators->push_back(NewIteratorImpl(read_options, cfd, snapshot)); + iterators->push_back( + NewIteratorImpl(read_options, cfd, snapshot, read_callback)); } } diff --git a/db/db_impl.h b/db/db_impl.h index 7b47da824..79017e52d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -137,6 +137,7 @@ class DBImpl : public DB { ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options, ColumnFamilyData* cfd, SequenceNumber snapshot, + ReadCallback* read_callback, bool allow_blob = false); virtual const Snapshot* GetSnapshot() override; diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 08379c5a9..b7ebaa53f 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -57,6 +57,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options, auto cfd = cfh->cfd(); SuperVersion* super_version = cfd->GetSuperVersion()->Ref(); SequenceNumber latest_snapshot = versions_->LastSequence(); + ReadCallback* read_callback = nullptr; // No read callback provided. auto db_iter = NewArenaWrappedDbIterator( env_, read_options, *cfd->ioptions(), (read_options.snapshot != nullptr @@ -64,7 +65,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options, ->number_ : latest_snapshot), super_version->mutable_cf_options.max_sequential_skip_in_iterations, - super_version->version_number); + super_version->version_number, read_callback); auto internal_iter = NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(), db_iter->GetRangeDelAggregator()); @@ -76,6 +77,7 @@ Status DBImplReadOnly::NewIterators( const ReadOptions& read_options, const std::vector& column_families, std::vector* iterators) { + ReadCallback* read_callback = nullptr; // No read callback provided. if (iterators == nullptr) { return Status::InvalidArgument("iterators not allowed to be nullptr"); } @@ -93,7 +95,7 @@ Status DBImplReadOnly::NewIterators( ->number_ : latest_snapshot), sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number); + sv->version_number, read_callback); auto* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), db_iter->GetRangeDelAggregator()); diff --git a/db/db_iter.cc b/db/db_iter.cc index d9a1eaee7..e9632246a 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -102,7 +102,8 @@ class DBIter final: public Iterator { DBIter(Env* _env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const Comparator* cmp, InternalIterator* iter, SequenceNumber s, bool arena_mode, - uint64_t max_sequential_skip_in_iterations, bool allow_blob) + uint64_t max_sequential_skip_in_iterations, + ReadCallback* read_callback, bool allow_blob) : arena_mode_(arena_mode), env_(_env), logger_(cf_options.info_log), @@ -120,6 +121,7 @@ class DBIter final: public Iterator { total_order_seek_(read_options.total_order_seek), range_del_agg_(cf_options.internal_comparator, s, true /* collapse_deletions */), + read_callback_(read_callback), allow_blob_(allow_blob) { RecordTick(statistics_, NO_ITERATORS); prefix_extractor_ = cf_options.prefix_extractor; @@ -226,6 +228,7 @@ class DBIter final: public Iterator { bool ParseKey(ParsedInternalKey* key); void MergeValuesNewToOld(); bool TooManyInternalKeysSkipped(bool increment = true); + bool IsVisible(SequenceNumber sequence); // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() // is called @@ -293,6 +296,7 @@ class DBIter final: public Iterator { RangeDelAggregator range_del_agg_; LocalStatistics local_stats_; PinnedIteratorsManager pinned_iters_mgr_; + ReadCallback* read_callback_; bool allow_blob_; bool is_blob_; @@ -408,7 +412,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { return; } - if (ikey_.sequence <= sequence_) { + if (IsVisible(ikey_.sequence)) { if (skipping && user_comparator_->Compare(ikey_.user_key, saved_key_.GetUserKey()) <= 0) { num_skipped++; // skip this entry @@ -674,7 +678,7 @@ void DBIter::ReverseToBackward() { user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) > 0) { assert(ikey.sequence != kMaxSequenceNumber); - if (ikey.sequence > sequence_) { + if (!IsVisible(ikey.sequence)) { PERF_COUNTER_ADD(internal_recent_skipped_count, 1); } else { PERF_COUNTER_ADD(internal_key_skipped_count, 1); @@ -762,7 +766,7 @@ bool DBIter::FindValueForCurrentKey() { ReleaseTempPinnedData(); TempPinData(); size_t num_skipped = 0; - while (iter_->Valid() && ikey.sequence <= sequence_ && + while (iter_->Valid() && IsVisible(ikey.sequence) && user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { if (TooManyInternalKeysSkipped()) { return false; @@ -1001,7 +1005,7 @@ void DBIter::FindPrevUserKey() { while (iter_->Valid() && ((cmp = user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey())) == 0 || - (cmp > 0 && ikey.sequence > sequence_))) { + (cmp > 0 && !IsVisible(ikey.sequence)))) { if (TooManyInternalKeysSkipped()) { return; } @@ -1019,7 +1023,7 @@ void DBIter::FindPrevUserKey() { } } assert(ikey.sequence != kMaxSequenceNumber); - if (ikey.sequence > sequence_) { + if (!IsVisible(ikey.sequence)) { PERF_COUNTER_ADD(internal_recent_skipped_count, 1); } else { PERF_COUNTER_ADD(internal_key_skipped_count, 1); @@ -1041,6 +1045,11 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) { return false; } +bool DBIter::IsVisible(SequenceNumber sequence) { + return sequence <= sequence_ && + (read_callback_ == nullptr || read_callback_->IsCommitted(sequence)); +} + // Skip all unparseable keys void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) { while (iter_->Valid() && !ParseKey(ikey)) { @@ -1225,10 +1234,11 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, InternalIterator* internal_iter, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, - bool allow_blob) { - DBIter* db_iter = new DBIter( - env, read_options, cf_options, user_key_comparator, internal_iter, - sequence, false, max_sequential_skip_in_iterations, allow_blob); + ReadCallback* read_callback, bool allow_blob) { + DBIter* db_iter = + new DBIter(env, read_options, cf_options, user_key_comparator, + internal_iter, sequence, false, + max_sequential_skip_in_iterations, read_callback, allow_blob); return db_iter; } @@ -1273,11 +1283,13 @@ void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration, - uint64_t version_number, bool allow_blob) { + uint64_t version_number, + ReadCallback* read_callback, bool allow_blob) { auto mem = arena_.AllocateAligned(sizeof(DBIter)); db_iter_ = new (mem) DBIter(env, read_options, cf_options, cf_options.user_comparator, nullptr, - sequence, true, max_sequential_skip_in_iteration, allow_blob); + sequence, true, max_sequential_skip_in_iteration, read_callback, + allow_blob); sv_number_ = version_number; } @@ -1297,7 +1309,7 @@ Status ArenaWrappedDBIter::Refresh() { SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); Init(env, read_options_, *(cfd_->ioptions()), latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, - cur_sv_number, allow_blob_); + cur_sv_number, read_callback_, allow_blob_); InternalIterator* internal_iter = db_impl_->NewInternalIterator( read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator()); @@ -1313,12 +1325,15 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - DBImpl* db_impl, ColumnFamilyData* cfd, bool allow_blob) { + ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, + bool allow_blob) { ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); iter->Init(env, read_options, cf_options, sequence, - max_sequential_skip_in_iterations, version_number, allow_blob); + max_sequential_skip_in_iterations, version_number, read_callback, + allow_blob); if (db_impl != nullptr && cfd != nullptr) { - iter->StoreRefreshInfo(read_options, db_impl, cfd, allow_blob); + iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback, + allow_blob); } return iter; diff --git a/db/db_iter.h b/db/db_iter.h index 26fcd44cb..7047f5197 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -34,6 +34,7 @@ extern Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, InternalIterator* internal_iter, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, + ReadCallback* read_callback, bool allow_blob = false); // A wrapper iterator which wraps DB Iterator and the arena, with which the DB @@ -72,13 +73,15 @@ class ArenaWrappedDBIter : public Iterator { const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - bool allow_blob); + ReadCallback* read_callback, bool allow_blob); void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl, - ColumnFamilyData* cfd, bool allow_blob) { + ColumnFamilyData* cfd, ReadCallback* read_callback, + bool allow_blob) { read_options_ = read_options; db_impl_ = db_impl; cfd_ = cfd; + read_callback_ = read_callback; allow_blob_ = allow_blob; } @@ -89,6 +92,7 @@ class ArenaWrappedDBIter : public Iterator { ColumnFamilyData* cfd_ = nullptr; DBImpl* db_impl_ = nullptr; ReadOptions read_options_; + ReadCallback* read_callback_; bool allow_blob_ = false; }; @@ -99,7 +103,7 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr, - bool allow_blob = false); + ReadCallback* read_callback, DBImpl* db_impl = nullptr, + ColumnFamilyData* cfd = nullptr, bool allow_blob = false); } // namespace rocksdb diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index 6db3b4a9b..d16ab42de 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -191,9 +191,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { internal_iter->Finish(); ReadOptions ro; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -223,9 +223,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { internal_iter->Finish(); ReadOptions ro; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -249,9 +249,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { ReadOptions ro; ro.iterate_upper_bound = &prefix; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -281,9 +281,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { ReadOptions ro; ro.iterate_upper_bound = &prefix; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -316,9 +316,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { ReadOptions ro; ro.iterate_upper_bound = &prefix; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(!db_iter->Valid()); @@ -345,9 +345,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { ReadOptions ro; ro.iterate_upper_bound = &prefix; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 7, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 7, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); SetPerfLevel(kEnableCount); ASSERT_TRUE(GetPerfLevel() == kEnableCount); @@ -382,9 +382,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { ReadOptions ro; ro.iterate_upper_bound = &prefix; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 4, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 4, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -407,9 +407,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { ReadOptions ro; ro.iterate_upper_bound = &prefix; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(!db_iter->Valid()); @@ -429,9 +429,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { ReadOptions ro; ro.iterate_upper_bound = &prefix; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -464,9 +464,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { ReadOptions ro; ro.iterate_upper_bound = &prefix; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 7, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 7, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); SetPerfLevel(kEnableCount); ASSERT_TRUE(GetPerfLevel() == kEnableCount); @@ -493,9 +493,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { internal_iter->Finish(); ReadOptions ro; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); @@ -535,9 +535,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { internal_iter->Finish(); ReadOptions ro; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 2, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 2, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "b"); @@ -566,9 +566,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { internal_iter->Finish(); ReadOptions ro; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "c"); @@ -595,9 +595,9 @@ TEST_F(DBIteratorTest, DBIteratorEmpty) { TestIterator* internal_iter = new TestIterator(BytewiseComparator()); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 0, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 0, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(!db_iter->Valid()); } @@ -606,9 +606,9 @@ TEST_F(DBIteratorTest, DBIteratorEmpty) { TestIterator* internal_iter = new TestIterator(BytewiseComparator()); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 0, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 0, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(!db_iter->Valid()); } @@ -628,9 +628,10 @@ TEST_F(DBIteratorTest, DBIteratorUseSkipCountSkips) { } internal_iter->Finish(); - std::unique_ptr db_iter(NewDBIterator( - env_, ro, ImmutableCFOptions(options), BytewiseComparator(), - internal_iter, 2, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter( + NewDBIterator(env_, ro, ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 2, options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "c"); @@ -673,7 +674,8 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) { options.statistics = rocksdb::CreateDBStatistics(); std::unique_ptr db_iter(NewDBIterator( env_, ro, cf_options, BytewiseComparator(), internal_iter, i + 2, - options.max_sequential_skip_in_iterations)); + options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -708,7 +710,8 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) { std::unique_ptr db_iter(NewDBIterator( env_, ro, cf_options, BytewiseComparator(), internal_iter, i + 2, - options.max_sequential_skip_in_iterations)); + options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -736,7 +739,8 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) { std::unique_ptr db_iter(NewDBIterator( env_, ro, cf_options, BytewiseComparator(), internal_iter, 202, - options.max_sequential_skip_in_iterations)); + options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -768,7 +772,8 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) { internal_iter->Finish(); std::unique_ptr db_iter(NewDBIterator( env_, ro, cf_options, BytewiseComparator(), internal_iter, i, - options.max_sequential_skip_in_iterations)); + options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(!db_iter->Valid()); @@ -782,9 +787,9 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) { } internal_iter->AddPut("c", "200"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 200, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 200, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "c"); @@ -818,7 +823,8 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) { std::unique_ptr db_iter(NewDBIterator( env_, ro, cf_options, BytewiseComparator(), internal_iter, i + 2, - options.max_sequential_skip_in_iterations)); + options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -852,7 +858,8 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) { std::unique_ptr db_iter(NewDBIterator( env_, ro, cf_options, BytewiseComparator(), internal_iter, i + 2, - options.max_sequential_skip_in_iterations)); + options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -898,9 +905,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) { internal_iter->Finish(); ro.max_skippable_internal_keys = 0; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); @@ -944,9 +951,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) { internal_iter->Finish(); ro.max_skippable_internal_keys = 2; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); @@ -988,9 +995,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) { internal_iter->Finish(); ro.max_skippable_internal_keys = 2; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); @@ -1026,9 +1033,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) { internal_iter->Finish(); ro.max_skippable_internal_keys = 2; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); @@ -1061,9 +1068,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) { internal_iter->Finish(); ro.max_skippable_internal_keys = 2; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -1091,9 +1098,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) { internal_iter->Finish(); ro.max_skippable_internal_keys = 2; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); @@ -1128,9 +1135,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) { internal_iter->Finish(); ro.max_skippable_internal_keys = 2; - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); @@ -1167,7 +1174,8 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) { ro.max_skippable_internal_keys = i; std::unique_ptr db_iter(NewDBIterator( env_, ro, cf_options, BytewiseComparator(), internal_iter, 2 * i + 1, - options.max_sequential_skip_in_iterations)); + options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); @@ -1220,7 +1228,8 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) { ro.max_skippable_internal_keys = i; std::unique_ptr db_iter(NewDBIterator( env_, ro, cf_options, BytewiseComparator(), internal_iter, 2 * i + 1, - options.max_sequential_skip_in_iterations)); + options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); @@ -1256,9 +1265,10 @@ TEST_F(DBIteratorTest, DBIterator1) { internal_iter->AddMerge("b", "2"); internal_iter->Finish(); - std::unique_ptr db_iter(NewDBIterator( - env_, ro, ImmutableCFOptions(options), BytewiseComparator(), - internal_iter, 1, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter( + NewDBIterator(env_, ro, ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 1, options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1283,9 +1293,10 @@ TEST_F(DBIteratorTest, DBIterator2) { internal_iter->AddMerge("b", "2"); internal_iter->Finish(); - std::unique_ptr db_iter(NewDBIterator( - env_, ro, ImmutableCFOptions(options), BytewiseComparator(), - internal_iter, 0, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter( + NewDBIterator(env_, ro, ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 0, options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1307,9 +1318,10 @@ TEST_F(DBIteratorTest, DBIterator3) { internal_iter->AddMerge("b", "2"); internal_iter->Finish(); - std::unique_ptr db_iter(NewDBIterator( - env_, ro, ImmutableCFOptions(options), BytewiseComparator(), - internal_iter, 2, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter( + NewDBIterator(env_, ro, ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 2, options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1331,9 +1343,10 @@ TEST_F(DBIteratorTest, DBIterator4) { internal_iter->AddMerge("b", "2"); internal_iter->Finish(); - std::unique_ptr db_iter(NewDBIterator( - env_, ro, ImmutableCFOptions(options), BytewiseComparator(), - internal_iter, 4, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter( + NewDBIterator(env_, ro, ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 4, options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1363,9 +1376,9 @@ TEST_F(DBIteratorTest, DBIterator5) { internal_iter->AddMerge("a", "merge_6"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 0, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 0, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1385,9 +1398,9 @@ TEST_F(DBIteratorTest, DBIterator5) { internal_iter->AddMerge("a", "merge_6"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 1, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 1, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1407,9 +1420,9 @@ TEST_F(DBIteratorTest, DBIterator5) { internal_iter->AddMerge("a", "merge_6"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 2, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 2, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1429,9 +1442,9 @@ TEST_F(DBIteratorTest, DBIterator5) { internal_iter->AddMerge("a", "merge_6"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 3, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 3, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1451,9 +1464,9 @@ TEST_F(DBIteratorTest, DBIterator5) { internal_iter->AddMerge("a", "merge_6"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 4, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 4, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1473,9 +1486,9 @@ TEST_F(DBIteratorTest, DBIterator5) { internal_iter->AddMerge("a", "merge_6"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 5, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 5, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1495,9 +1508,9 @@ TEST_F(DBIteratorTest, DBIterator5) { internal_iter->AddMerge("a", "merge_6"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 6, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 6, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1515,9 +1528,9 @@ TEST_F(DBIteratorTest, DBIterator5) { internal_iter->AddMerge("a", "merge_2"); internal_iter->AddPut("b", "val_b"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 10, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 10, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->Seek("b"); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "b"); @@ -1544,9 +1557,9 @@ TEST_F(DBIteratorTest, DBIterator6) { internal_iter->AddMerge("a", "merge_6"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 0, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 0, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1566,9 +1579,9 @@ TEST_F(DBIteratorTest, DBIterator6) { internal_iter->AddMerge("a", "merge_6"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 1, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 1, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1588,9 +1601,9 @@ TEST_F(DBIteratorTest, DBIterator6) { internal_iter->AddMerge("a", "merge_6"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 2, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 2, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1610,9 +1623,9 @@ TEST_F(DBIteratorTest, DBIterator6) { internal_iter->AddMerge("a", "merge_6"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 3, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 3, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(!db_iter->Valid()); } @@ -1628,9 +1641,9 @@ TEST_F(DBIteratorTest, DBIterator6) { internal_iter->AddMerge("a", "merge_6"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 4, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 4, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1650,9 +1663,9 @@ TEST_F(DBIteratorTest, DBIterator6) { internal_iter->AddMerge("a", "merge_6"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 5, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 5, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1672,9 +1685,9 @@ TEST_F(DBIteratorTest, DBIterator6) { internal_iter->AddMerge("a", "merge_6"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 6, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 6, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1713,9 +1726,9 @@ TEST_F(DBIteratorTest, DBIterator7) { internal_iter->AddDeletion("c"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 0, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 0, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -1747,9 +1760,9 @@ TEST_F(DBIteratorTest, DBIterator7) { internal_iter->AddDeletion("c"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 2, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 2, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -1787,9 +1800,9 @@ TEST_F(DBIteratorTest, DBIterator7) { internal_iter->AddDeletion("c"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 4, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 4, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -1827,9 +1840,9 @@ TEST_F(DBIteratorTest, DBIterator7) { internal_iter->AddDeletion("c"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 5, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 5, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -1872,9 +1885,9 @@ TEST_F(DBIteratorTest, DBIterator7) { internal_iter->AddDeletion("c"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 6, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 6, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -1918,9 +1931,9 @@ TEST_F(DBIteratorTest, DBIterator7) { internal_iter->AddDeletion("c"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 7, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 7, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -1958,9 +1971,9 @@ TEST_F(DBIteratorTest, DBIterator7) { internal_iter->AddDeletion("c"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 9, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 9, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -2004,9 +2017,9 @@ TEST_F(DBIteratorTest, DBIterator7) { internal_iter->AddDeletion("c"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 13, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 13, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -2051,9 +2064,9 @@ TEST_F(DBIteratorTest, DBIterator7) { internal_iter->AddDeletion("c"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, cf_options, BytewiseComparator(), internal_iter, - 14, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, cf_options, BytewiseComparator(), internal_iter, 14, + options.max_sequential_skip_in_iterations, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -2083,7 +2096,8 @@ TEST_F(DBIteratorTest, DBIterator8) { std::unique_ptr db_iter(NewDBIterator( env_, ro, ImmutableCFOptions(options), BytewiseComparator(), - internal_iter, 10, options.max_sequential_skip_in_iterations)); + internal_iter, 10, options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "b"); @@ -2113,7 +2127,8 @@ TEST_F(DBIteratorTest, DBIterator9) { std::unique_ptr db_iter(NewDBIterator( env_, ro, ImmutableCFOptions(options), BytewiseComparator(), - internal_iter, 10, options.max_sequential_skip_in_iterations)); + internal_iter, 10, options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); @@ -2179,7 +2194,8 @@ TEST_F(DBIteratorTest, DBIterator10) { std::unique_ptr db_iter(NewDBIterator( env_, ro, ImmutableCFOptions(options), BytewiseComparator(), - internal_iter, 10, options.max_sequential_skip_in_iterations)); + internal_iter, 10, options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->Seek("c"); ASSERT_TRUE(db_iter->Valid()); @@ -2216,9 +2232,9 @@ TEST_F(DBIteratorTest, SeekToLastOccurrenceSeq0) { internal_iter->AddPut("b", "2"); internal_iter->Finish(); - std::unique_ptr db_iter( - NewDBIterator(env_, ro, ImmutableCFOptions(options), BytewiseComparator(), - internal_iter, 10, 0 /* force seek */)); + std::unique_ptr db_iter(NewDBIterator( + env_, ro, ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 10, 0 /* force seek */, nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -2244,9 +2260,10 @@ TEST_F(DBIteratorTest, DBIterator11) { internal_iter->AddMerge("b", "2"); internal_iter->Finish(); - std::unique_ptr db_iter(NewDBIterator( - env_, ro, ImmutableCFOptions(options), BytewiseComparator(), - internal_iter, 1, options.max_sequential_skip_in_iterations)); + std::unique_ptr db_iter( + NewDBIterator(env_, ro, ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 1, options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); db_iter->SeekToFirst(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "a"); @@ -2272,7 +2289,7 @@ TEST_F(DBIteratorTest, DBIterator12) { std::unique_ptr db_iter( NewDBIterator(env_, ro, ImmutableCFOptions(options), BytewiseComparator(), - internal_iter, 10, 0)); + internal_iter, 10, 0, nullptr /*read_callback*/)); db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "c"); @@ -2309,7 +2326,7 @@ TEST_F(DBIteratorTest, DBIterator13) { std::unique_ptr db_iter( NewDBIterator(env_, ro, ImmutableCFOptions(options), BytewiseComparator(), - internal_iter, 2, 3)); + internal_iter, 2, 3, nullptr /*read_callback*/)); db_iter->Seek("b"); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), key); @@ -2337,7 +2354,7 @@ TEST_F(DBIteratorTest, DBIterator14) { std::unique_ptr db_iter( NewDBIterator(env_, ro, ImmutableCFOptions(options), BytewiseComparator(), - internal_iter, 4, 1)); + internal_iter, 4, 1, nullptr /*read_callback*/)); db_iter->Seek("b"); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "b"); @@ -2373,10 +2390,10 @@ class DBIterWithMergeIterTest : public testing::Test { InternalIterator* merge_iter = NewMergingIterator(&icomp_, &child_iters[0], 2u); - db_iter_.reset(NewDBIterator(env_, ro_, ImmutableCFOptions(options_), - BytewiseComparator(), merge_iter, - 8 /* read data earlier than seqId 8 */, - 3 /* max iterators before reseek */)); + db_iter_.reset(NewDBIterator( + env_, ro_, ImmutableCFOptions(options_), BytewiseComparator(), + merge_iter, 8 /* read data earlier than seqId 8 */, + 3 /* max iterators before reseek */, nullptr /*read_callback*/)); } Env* env_; diff --git a/utilities/date_tiered/date_tiered_db_impl.cc b/utilities/date_tiered/date_tiered_db_impl.cc index 83f5666e5..596e7f027 100644 --- a/utilities/date_tiered/date_tiered_db_impl.cc +++ b/utilities/date_tiered/date_tiered_db_impl.cc @@ -380,7 +380,8 @@ Iterator* DateTieredDBImpl::NewIterator(const ReadOptions& opts) { auto db_iter = NewArenaWrappedDbIterator( db_impl->GetEnv(), opts, ioptions_, kMaxSequenceNumber, - cf_options_.max_sequential_skip_in_iterations, 0); + cf_options_.max_sequential_skip_in_iterations, 0, + nullptr /*read_callback*/); auto arena = db_iter->GetArena(); MergeIteratorBuilder builder(&icomp_, arena); diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index cf336106a..4690cd298 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -588,6 +588,68 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options, &callback); } +// Struct to hold ownership of snapshot and read callback for iterator cleanup. +struct WritePreparedTxnDB::IteratorState { + IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, + std::shared_ptr s) + : callback(txn_db, sequence), snapshot(s) {} + + WritePreparedTxnReadCallback callback; + std::shared_ptr snapshot; +}; + +namespace { +static void CleanupWritePreparedTxnDBIterator(void* arg1, void* arg2) { + delete reinterpret_cast(arg1); +} +} // anonymous namespace + +Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) { + std::shared_ptr own_snapshot = nullptr; + SequenceNumber snapshot_seq = kMaxSequenceNumber; + if (options.snapshot != nullptr) { + snapshot_seq = options.snapshot->GetSequenceNumber(); + } else { + auto* snapshot = db_impl_->GetSnapshot(); + snapshot_seq = snapshot->GetSequenceNumber(); + own_snapshot = std::make_shared(db_impl_, snapshot); + } + assert(snapshot_seq != kMaxSequenceNumber); + auto* cfd = reinterpret_cast(column_family)->cfd(); + auto* state = new IteratorState(this, snapshot_seq, own_snapshot); + auto* db_iter = + db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback); + db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); + return db_iter; +} + +Status WritePreparedTxnDB::NewIterators( + const ReadOptions& options, + const std::vector& column_families, + std::vector* iterators) { + std::shared_ptr own_snapshot = nullptr; + SequenceNumber snapshot_seq = kMaxSequenceNumber; + if (options.snapshot != nullptr) { + snapshot_seq = options.snapshot->GetSequenceNumber(); + } else { + auto* snapshot = db_impl_->GetSnapshot(); + snapshot_seq = snapshot->GetSequenceNumber(); + own_snapshot = std::make_shared(db_impl_, snapshot); + } + iterators->clear(); + iterators->reserve(column_families.size()); + for (auto* column_family : column_families) { + auto* cfd = reinterpret_cast(column_family)->cfd(); + auto* state = new IteratorState(this, snapshot_seq, own_snapshot); + auto* db_iter = + db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback); + db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); + iterators->push_back(db_iter); + } + return Status::OK(); +} + void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { // Adcance max_evicted_seq_ no more than 100 times before the cache wraps // around. diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index b67d35c03..0031fc3ae 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -13,6 +13,7 @@ #include #include +#include "db/db_iter.h" #include "db/read_callback.h" #include "db/snapshot_checker.h" #include "rocksdb/db.h" @@ -206,6 +207,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; + using DB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) override; + + using DB::NewIterators; + virtual Status NewIterators( + const ReadOptions& options, + const std::vector& column_families, + std::vector* iterators) override; + // Check whether the transaction that wrote the value with seqeunce number seq // is visible to the snapshot with sequence number snapshot_seq bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq) const; @@ -300,6 +311,9 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { uint64_t rep_; }; + // Struct to hold ownership of snapshot and read callback for cleanup. + struct IteratorState; + private: friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 3b8242716..78141ce21 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1636,6 +1636,61 @@ TEST_P(WritePreparedTransactionTest, delete transaction; } +TEST_P(WritePreparedTransactionTest, Iterate) { + auto verify_state = [](Iterator* iter, const std::string& key, + const std::string& value) { + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(key, iter->key().ToString()); + ASSERT_EQ(value, iter->value().ToString()); + }; + + auto verify_iter = [&](const std::string& expected_val) { + // Get iterator from a concurrent transaction and make sure it has the + // same view as an iterator from the DB. + auto* txn = db->BeginTransaction(WriteOptions()); + + for (int i = 0; i < 2; i++) { + Iterator* iter = (i == 0) + ? db->NewIterator(ReadOptions()) + : txn->GetIterator(ReadOptions()); + // Seek + iter->Seek("foo"); + verify_state(iter, "foo", expected_val); + // Next + iter->Seek("a"); + verify_state(iter, "a", "va"); + iter->Next(); + verify_state(iter, "foo", expected_val); + // SeekForPrev + iter->SeekForPrev("y"); + verify_state(iter, "foo", expected_val); + // Prev + iter->SeekForPrev("z"); + verify_state(iter, "z", "vz"); + iter->Prev(); + verify_state(iter, "foo", expected_val); + delete iter; + } + delete txn; + }; + + ASSERT_OK(db->Put(WriteOptions(), "foo", "v1")); + auto* transaction = db->BeginTransaction(WriteOptions()); + ASSERT_OK(transaction->SetName("txn")); + ASSERT_OK(transaction->Put("foo", "v2")); + ASSERT_OK(transaction->Prepare()); + VerifyKeys({{"foo", "v1"}}); + // dummy keys + ASSERT_OK(db->Put(WriteOptions(), "a", "va")); + ASSERT_OK(db->Put(WriteOptions(), "z", "vz")); + verify_iter("v1"); + ASSERT_OK(transaction->Commit()); + VerifyKeys({{"foo", "v2"}}); + verify_iter("v2"); + delete transaction; +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index ef83c972d..e6d6f4597 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -41,6 +41,23 @@ Status WritePreparedTxn::Get(const ReadOptions& read_options, pinnable_val, &callback); } +Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) { + // Make sure to get iterator from WritePrepareTxnDB, not the root db. + Iterator* db_iter = wpt_db_->NewIterator(options); + assert(db_iter); + + return write_batch_.NewIteratorWithBase(db_iter); +} + +Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) { + // Make sure to get iterator from WritePrepareTxnDB, not the root db. + Iterator* db_iter = wpt_db_->NewIterator(options, column_family); + assert(db_iter); + + return write_batch_.NewIteratorWithBase(db_iter); +} + Status WritePreparedTxn::PrepareInternal() { WriteOptions write_options = write_options_; write_options.disableWAL = false; diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 0ae9887c3..09544b9f1 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -50,6 +50,11 @@ class WritePreparedTxn : public PessimisticTransaction { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; + using Transaction::GetIterator; + virtual Iterator* GetIterator(const ReadOptions& options) override; + virtual Iterator* GetIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) override; + private: friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;