diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index 1d21ae36e..7152ad9cd 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -636,6 +636,52 @@ TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) { } } +// Test that using an iterator after transaction clear is not supported +TEST_P(WriteUnpreparedTransactionTest, IterateAfterClear) { + WriteOptions woptions; + TransactionOptions txn_options; + txn_options.write_batch_flush_threshold = 1; + + enum Action { kCommit, kRollback }; + + for (Action a : {kCommit, kRollback}) { + for (int i = 0; i < 100; i++) { + ASSERT_OK(db->Put(woptions, ToString(i), ToString(i))); + } + + Transaction* txn = db->BeginTransaction(woptions, txn_options); + ASSERT_OK(txn->Put("9", "a")); + + ReadOptions roptions; + auto iter1 = txn->GetIterator(roptions); + auto iter2 = txn->GetIterator(roptions); + iter1->SeekToFirst(); + iter2->Seek("9"); + + // Check that iterators are valid before transaction finishes. + ASSERT_TRUE(iter1->Valid()); + ASSERT_TRUE(iter2->Valid()); + ASSERT_OK(iter1->status()); + ASSERT_OK(iter2->status()); + + if (a == kCommit) { + ASSERT_OK(txn->Commit()); + } else { + ASSERT_OK(txn->Rollback()); + } + + // Check that iterators are invalidated after transaction finishes. + ASSERT_FALSE(iter1->Valid()); + ASSERT_FALSE(iter2->Valid()); + ASSERT_TRUE(iter1->status().IsInvalidArgument()); + ASSERT_TRUE(iter2->status().IsInvalidArgument()); + + delete iter1; + delete iter2; + delete txn; + } +} + TEST_P(WriteUnpreparedTransactionTest, SavePoint) { WriteOptions woptions; TransactionOptions txn_options; diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 29f4bb6a2..c6740374f 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -9,6 +9,7 @@ #include "db/db_impl/db_impl.h" #include "util/cast_util.h" #include "utilities/transactions/write_unprepared_txn_db.h" +#include "utilities/write_batch_with_index/write_batch_with_index_internal.h" namespace ROCKSDB_NAMESPACE { @@ -819,7 +820,11 @@ void WriteUnpreparedTxn::Clear() { unflushed_save_points_.reset(nullptr); recovered_txn_ = false; largest_validated_seq_ = 0; - assert(active_iterators_.empty()); + for (auto& it : active_iterators_) { + auto bdit = static_cast(it); + bdit->Invalidate(Status::InvalidArgument( + "Cannot use iterator after transaction has finished")); + } active_iterators_.clear(); untracked_keys_.clear(); TransactionBaseImpl::Clear(); diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 1e92d5814..ce4ab686c 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -24,321 +24,6 @@ namespace ROCKSDB_NAMESPACE { -// when direction == forward -// * current_at_base_ <=> base_iterator > delta_iterator -// when direction == backwards -// * current_at_base_ <=> base_iterator < delta_iterator -// always: -// * equal_keys_ <=> base_iterator == delta_iterator -class BaseDeltaIterator : public Iterator { - public: - BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator, - const Comparator* comparator, - const ReadOptions* read_options = nullptr) - : forward_(true), - current_at_base_(true), - equal_keys_(false), - status_(Status::OK()), - base_iterator_(base_iterator), - delta_iterator_(delta_iterator), - comparator_(comparator), - iterate_upper_bound_(read_options ? read_options->iterate_upper_bound - : nullptr) {} - - ~BaseDeltaIterator() override {} - - bool Valid() const override { - return current_at_base_ ? BaseValid() : DeltaValid(); - } - - void SeekToFirst() override { - forward_ = true; - base_iterator_->SeekToFirst(); - delta_iterator_->SeekToFirst(); - UpdateCurrent(); - } - - void SeekToLast() override { - forward_ = false; - base_iterator_->SeekToLast(); - delta_iterator_->SeekToLast(); - UpdateCurrent(); - } - - void Seek(const Slice& k) override { - forward_ = true; - base_iterator_->Seek(k); - delta_iterator_->Seek(k); - UpdateCurrent(); - } - - void SeekForPrev(const Slice& k) override { - forward_ = false; - base_iterator_->SeekForPrev(k); - delta_iterator_->SeekForPrev(k); - UpdateCurrent(); - } - - void Next() override { - if (!Valid()) { - status_ = Status::NotSupported("Next() on invalid iterator"); - return; - } - - if (!forward_) { - // Need to change direction - // if our direction was backward and we're not equal, we have two states: - // * both iterators are valid: we're already in a good state (current - // shows to smaller) - // * only one iterator is valid: we need to advance that iterator - forward_ = true; - equal_keys_ = false; - if (!BaseValid()) { - assert(DeltaValid()); - base_iterator_->SeekToFirst(); - } else if (!DeltaValid()) { - delta_iterator_->SeekToFirst(); - } else if (current_at_base_) { - // Change delta from larger than base to smaller - AdvanceDelta(); - } else { - // Change base from larger than delta to smaller - AdvanceBase(); - } - if (DeltaValid() && BaseValid()) { - if (comparator_->Equal(delta_iterator_->Entry().key, - base_iterator_->key())) { - equal_keys_ = true; - } - } - } - Advance(); - } - - void Prev() override { - if (!Valid()) { - status_ = Status::NotSupported("Prev() on invalid iterator"); - return; - } - - if (forward_) { - // Need to change direction - // if our direction was backward and we're not equal, we have two states: - // * both iterators are valid: we're already in a good state (current - // shows to smaller) - // * only one iterator is valid: we need to advance that iterator - forward_ = false; - equal_keys_ = false; - if (!BaseValid()) { - assert(DeltaValid()); - base_iterator_->SeekToLast(); - } else if (!DeltaValid()) { - delta_iterator_->SeekToLast(); - } else if (current_at_base_) { - // Change delta from less advanced than base to more advanced - AdvanceDelta(); - } else { - // Change base from less advanced than delta to more advanced - AdvanceBase(); - } - if (DeltaValid() && BaseValid()) { - if (comparator_->Equal(delta_iterator_->Entry().key, - base_iterator_->key())) { - equal_keys_ = true; - } - } - } - - Advance(); - } - - Slice key() const override { - return current_at_base_ ? base_iterator_->key() - : delta_iterator_->Entry().key; - } - - Slice value() const override { - return current_at_base_ ? base_iterator_->value() - : delta_iterator_->Entry().value; - } - - Status status() const override { - if (!status_.ok()) { - return status_; - } - if (!base_iterator_->status().ok()) { - return base_iterator_->status(); - } - return delta_iterator_->status(); - } - - private: - void AssertInvariants() { -#ifndef NDEBUG - bool not_ok = false; - if (!base_iterator_->status().ok()) { - assert(!base_iterator_->Valid()); - not_ok = true; - } - if (!delta_iterator_->status().ok()) { - assert(!delta_iterator_->Valid()); - not_ok = true; - } - if (not_ok) { - assert(!Valid()); - assert(!status().ok()); - return; - } - - if (!Valid()) { - return; - } - if (!BaseValid()) { - assert(!current_at_base_ && delta_iterator_->Valid()); - return; - } - if (!DeltaValid()) { - assert(current_at_base_ && base_iterator_->Valid()); - return; - } - // we don't support those yet - assert(delta_iterator_->Entry().type != kMergeRecord && - delta_iterator_->Entry().type != kLogDataRecord); - int compare = comparator_->Compare(delta_iterator_->Entry().key, - base_iterator_->key()); - if (forward_) { - // current_at_base -> compare < 0 - assert(!current_at_base_ || compare < 0); - // !current_at_base -> compare <= 0 - assert(current_at_base_ && compare >= 0); - } else { - // current_at_base -> compare > 0 - assert(!current_at_base_ || compare > 0); - // !current_at_base -> compare <= 0 - assert(current_at_base_ && compare <= 0); - } - // equal_keys_ <=> compare == 0 - assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0)); -#endif - } - - void Advance() { - if (equal_keys_) { - assert(BaseValid() && DeltaValid()); - AdvanceBase(); - AdvanceDelta(); - } else { - if (current_at_base_) { - assert(BaseValid()); - AdvanceBase(); - } else { - assert(DeltaValid()); - AdvanceDelta(); - } - } - UpdateCurrent(); - } - - void AdvanceDelta() { - if (forward_) { - delta_iterator_->Next(); - } else { - delta_iterator_->Prev(); - } - } - void AdvanceBase() { - if (forward_) { - base_iterator_->Next(); - } else { - base_iterator_->Prev(); - } - } - bool BaseValid() const { return base_iterator_->Valid(); } - bool DeltaValid() const { return delta_iterator_->Valid(); } - void UpdateCurrent() { -// Suppress false positive clang analyzer warnings. -#ifndef __clang_analyzer__ - status_ = Status::OK(); - while (true) { - WriteEntry delta_entry; - if (DeltaValid()) { - assert(delta_iterator_->status().ok()); - delta_entry = delta_iterator_->Entry(); - } else if (!delta_iterator_->status().ok()) { - // Expose the error status and stop. - current_at_base_ = false; - return; - } - equal_keys_ = false; - if (!BaseValid()) { - if (!base_iterator_->status().ok()) { - // Expose the error status and stop. - current_at_base_ = true; - return; - } - - // Base has finished. - if (!DeltaValid()) { - // Finished - return; - } - if (iterate_upper_bound_) { - if (comparator_->Compare(delta_entry.key, *iterate_upper_bound_) >= - 0) { - // out of upper bound -> finished. - return; - } - } - if (delta_entry.type == kDeleteRecord || - delta_entry.type == kSingleDeleteRecord) { - AdvanceDelta(); - } else { - current_at_base_ = false; - return; - } - } else if (!DeltaValid()) { - // Delta has finished. - current_at_base_ = true; - return; - } else { - int compare = - (forward_ ? 1 : -1) * - comparator_->Compare(delta_entry.key, base_iterator_->key()); - if (compare <= 0) { // delta bigger or equal - if (compare == 0) { - equal_keys_ = true; - } - if (delta_entry.type != kDeleteRecord && - delta_entry.type != kSingleDeleteRecord) { - current_at_base_ = false; - return; - } - // Delta is less advanced and is delete. - AdvanceDelta(); - if (equal_keys_) { - AdvanceBase(); - } - } else { - current_at_base_ = true; - return; - } - } - } - - AssertInvariants(); -#endif // __clang_analyzer__ - } - - bool forward_; - bool current_at_base_; - bool equal_keys_; - Status status_; - std::unique_ptr base_iterator_; - std::unique_ptr delta_iterator_; - const Comparator* comparator_; // not owned - const Slice* iterate_upper_bound_; -}; - typedef SkipList WriteBatchEntrySkipList; diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h index 6a859e072..10f89093a 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.h +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -23,6 +23,324 @@ namespace ROCKSDB_NAMESPACE { class MergeContext; struct Options; +// when direction == forward +// * current_at_base_ <=> base_iterator > delta_iterator +// when direction == backwards +// * current_at_base_ <=> base_iterator < delta_iterator +// always: +// * equal_keys_ <=> base_iterator == delta_iterator +class BaseDeltaIterator : public Iterator { + public: + BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator, + const Comparator* comparator, + const ReadOptions* read_options = nullptr) + : forward_(true), + current_at_base_(true), + equal_keys_(false), + status_(Status::OK()), + base_iterator_(base_iterator), + delta_iterator_(delta_iterator), + comparator_(comparator), + iterate_upper_bound_(read_options ? read_options->iterate_upper_bound + : nullptr) {} + + ~BaseDeltaIterator() override {} + + bool Valid() const override { + return status_.ok() ? (current_at_base_ ? BaseValid() : DeltaValid()) + : false; + } + + void SeekToFirst() override { + forward_ = true; + base_iterator_->SeekToFirst(); + delta_iterator_->SeekToFirst(); + UpdateCurrent(); + } + + void SeekToLast() override { + forward_ = false; + base_iterator_->SeekToLast(); + delta_iterator_->SeekToLast(); + UpdateCurrent(); + } + + void Seek(const Slice& k) override { + forward_ = true; + base_iterator_->Seek(k); + delta_iterator_->Seek(k); + UpdateCurrent(); + } + + void SeekForPrev(const Slice& k) override { + forward_ = false; + base_iterator_->SeekForPrev(k); + delta_iterator_->SeekForPrev(k); + UpdateCurrent(); + } + + void Next() override { + if (!Valid()) { + status_ = Status::NotSupported("Next() on invalid iterator"); + return; + } + + if (!forward_) { + // Need to change direction + // if our direction was backward and we're not equal, we have two states: + // * both iterators are valid: we're already in a good state (current + // shows to smaller) + // * only one iterator is valid: we need to advance that iterator + forward_ = true; + equal_keys_ = false; + if (!BaseValid()) { + assert(DeltaValid()); + base_iterator_->SeekToFirst(); + } else if (!DeltaValid()) { + delta_iterator_->SeekToFirst(); + } else if (current_at_base_) { + // Change delta from larger than base to smaller + AdvanceDelta(); + } else { + // Change base from larger than delta to smaller + AdvanceBase(); + } + if (DeltaValid() && BaseValid()) { + if (comparator_->Equal(delta_iterator_->Entry().key, + base_iterator_->key())) { + equal_keys_ = true; + } + } + } + Advance(); + } + + void Prev() override { + if (!Valid()) { + status_ = Status::NotSupported("Prev() on invalid iterator"); + return; + } + + if (forward_) { + // Need to change direction + // if our direction was backward and we're not equal, we have two states: + // * both iterators are valid: we're already in a good state (current + // shows to smaller) + // * only one iterator is valid: we need to advance that iterator + forward_ = false; + equal_keys_ = false; + if (!BaseValid()) { + assert(DeltaValid()); + base_iterator_->SeekToLast(); + } else if (!DeltaValid()) { + delta_iterator_->SeekToLast(); + } else if (current_at_base_) { + // Change delta from less advanced than base to more advanced + AdvanceDelta(); + } else { + // Change base from less advanced than delta to more advanced + AdvanceBase(); + } + if (DeltaValid() && BaseValid()) { + if (comparator_->Equal(delta_iterator_->Entry().key, + base_iterator_->key())) { + equal_keys_ = true; + } + } + } + + Advance(); + } + + Slice key() const override { + return current_at_base_ ? base_iterator_->key() + : delta_iterator_->Entry().key; + } + + Slice value() const override { + return current_at_base_ ? base_iterator_->value() + : delta_iterator_->Entry().value; + } + + Status status() const override { + if (!status_.ok()) { + return status_; + } + if (!base_iterator_->status().ok()) { + return base_iterator_->status(); + } + return delta_iterator_->status(); + } + + void Invalidate(Status s) { status_ = s; } + + private: + void AssertInvariants() { +#ifndef NDEBUG + bool not_ok = false; + if (!base_iterator_->status().ok()) { + assert(!base_iterator_->Valid()); + not_ok = true; + } + if (!delta_iterator_->status().ok()) { + assert(!delta_iterator_->Valid()); + not_ok = true; + } + if (not_ok) { + assert(!Valid()); + assert(!status().ok()); + return; + } + + if (!Valid()) { + return; + } + if (!BaseValid()) { + assert(!current_at_base_ && delta_iterator_->Valid()); + return; + } + if (!DeltaValid()) { + assert(current_at_base_ && base_iterator_->Valid()); + return; + } + // we don't support those yet + assert(delta_iterator_->Entry().type != kMergeRecord && + delta_iterator_->Entry().type != kLogDataRecord); + int compare = comparator_->Compare(delta_iterator_->Entry().key, + base_iterator_->key()); + if (forward_) { + // current_at_base -> compare < 0 + assert(!current_at_base_ || compare < 0); + // !current_at_base -> compare <= 0 + assert(current_at_base_ && compare >= 0); + } else { + // current_at_base -> compare > 0 + assert(!current_at_base_ || compare > 0); + // !current_at_base -> compare <= 0 + assert(current_at_base_ && compare <= 0); + } + // equal_keys_ <=> compare == 0 + assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0)); +#endif + } + + void Advance() { + if (equal_keys_) { + assert(BaseValid() && DeltaValid()); + AdvanceBase(); + AdvanceDelta(); + } else { + if (current_at_base_) { + assert(BaseValid()); + AdvanceBase(); + } else { + assert(DeltaValid()); + AdvanceDelta(); + } + } + UpdateCurrent(); + } + + void AdvanceDelta() { + if (forward_) { + delta_iterator_->Next(); + } else { + delta_iterator_->Prev(); + } + } + void AdvanceBase() { + if (forward_) { + base_iterator_->Next(); + } else { + base_iterator_->Prev(); + } + } + bool BaseValid() const { return base_iterator_->Valid(); } + bool DeltaValid() const { return delta_iterator_->Valid(); } + void UpdateCurrent() { +// Suppress false positive clang analyzer warnings. +#ifndef __clang_analyzer__ + status_ = Status::OK(); + while (true) { + WriteEntry delta_entry; + if (DeltaValid()) { + assert(delta_iterator_->status().ok()); + delta_entry = delta_iterator_->Entry(); + } else if (!delta_iterator_->status().ok()) { + // Expose the error status and stop. + current_at_base_ = false; + return; + } + equal_keys_ = false; + if (!BaseValid()) { + if (!base_iterator_->status().ok()) { + // Expose the error status and stop. + current_at_base_ = true; + return; + } + + // Base has finished. + if (!DeltaValid()) { + // Finished + return; + } + if (iterate_upper_bound_) { + if (comparator_->Compare(delta_entry.key, *iterate_upper_bound_) >= + 0) { + // out of upper bound -> finished. + return; + } + } + if (delta_entry.type == kDeleteRecord || + delta_entry.type == kSingleDeleteRecord) { + AdvanceDelta(); + } else { + current_at_base_ = false; + return; + } + } else if (!DeltaValid()) { + // Delta has finished. + current_at_base_ = true; + return; + } else { + int compare = + (forward_ ? 1 : -1) * + comparator_->Compare(delta_entry.key, base_iterator_->key()); + if (compare <= 0) { // delta bigger or equal + if (compare == 0) { + equal_keys_ = true; + } + if (delta_entry.type != kDeleteRecord && + delta_entry.type != kSingleDeleteRecord) { + current_at_base_ = false; + return; + } + // Delta is less advanced and is delete. + AdvanceDelta(); + if (equal_keys_) { + AdvanceBase(); + } + } else { + current_at_base_ = true; + return; + } + } + } + + AssertInvariants(); +#endif // __clang_analyzer__ + } + + bool forward_; + bool current_at_base_; + bool equal_keys_; + Status status_; + std::unique_ptr base_iterator_; + std::unique_ptr delta_iterator_; + const Comparator* comparator_; // not owned + const Slice* iterate_upper_bound_; +}; + // Key used by skip list, as the binary searchable index of WriteBatchWithIndex. struct WriteBatchIndexEntry { WriteBatchIndexEntry(size_t o, uint32_t c, size_t ko, size_t ksz)