Invalidate iterator on transaction clear (#7733)

Summary:
Some clients do not close their iterators until after the transaction finishes. To handle this case, we will invalidate any iterators on transaction clear.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7733

Reviewed By: cheng-chang

Differential Revision: D25261158

Pulled By: lth

fbshipit-source-id: b91320f00c54cbe0e6882b794b34f3bb5640dbc0
main
Manuel Ung 4 years ago committed by Facebook GitHub Bot
parent 80159f6e0b
commit 71239908cf
  1. 46
      utilities/transactions/write_unprepared_transaction_test.cc
  2. 7
      utilities/transactions/write_unprepared_txn.cc
  3. 315
      utilities/write_batch_with_index/write_batch_with_index.cc
  4. 318
      utilities/write_batch_with_index/write_batch_with_index_internal.h

@ -636,6 +636,52 @@ TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) {
}
}
// Test that using an iterator after transaction clear is not supported
TEST_P(WriteUnpreparedTransactionTest, IterateAfterClear) {
WriteOptions woptions;
TransactionOptions txn_options;
txn_options.write_batch_flush_threshold = 1;
enum Action { kCommit, kRollback };
for (Action a : {kCommit, kRollback}) {
for (int i = 0; i < 100; i++) {
ASSERT_OK(db->Put(woptions, ToString(i), ToString(i)));
}
Transaction* txn = db->BeginTransaction(woptions, txn_options);
ASSERT_OK(txn->Put("9", "a"));
ReadOptions roptions;
auto iter1 = txn->GetIterator(roptions);
auto iter2 = txn->GetIterator(roptions);
iter1->SeekToFirst();
iter2->Seek("9");
// Check that iterators are valid before transaction finishes.
ASSERT_TRUE(iter1->Valid());
ASSERT_TRUE(iter2->Valid());
ASSERT_OK(iter1->status());
ASSERT_OK(iter2->status());
if (a == kCommit) {
ASSERT_OK(txn->Commit());
} else {
ASSERT_OK(txn->Rollback());
}
// Check that iterators are invalidated after transaction finishes.
ASSERT_FALSE(iter1->Valid());
ASSERT_FALSE(iter2->Valid());
ASSERT_TRUE(iter1->status().IsInvalidArgument());
ASSERT_TRUE(iter2->status().IsInvalidArgument());
delete iter1;
delete iter2;
delete txn;
}
}
TEST_P(WriteUnpreparedTransactionTest, SavePoint) {
WriteOptions woptions;
TransactionOptions txn_options;

@ -9,6 +9,7 @@
#include "db/db_impl/db_impl.h"
#include "util/cast_util.h"
#include "utilities/transactions/write_unprepared_txn_db.h"
#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
namespace ROCKSDB_NAMESPACE {
@ -819,7 +820,11 @@ void WriteUnpreparedTxn::Clear() {
unflushed_save_points_.reset(nullptr);
recovered_txn_ = false;
largest_validated_seq_ = 0;
assert(active_iterators_.empty());
for (auto& it : active_iterators_) {
auto bdit = static_cast<BaseDeltaIterator*>(it);
bdit->Invalidate(Status::InvalidArgument(
"Cannot use iterator after transaction has finished"));
}
active_iterators_.clear();
untracked_keys_.clear();
TransactionBaseImpl::Clear();

@ -24,321 +24,6 @@
namespace ROCKSDB_NAMESPACE {
// when direction == forward
// * current_at_base_ <=> base_iterator > delta_iterator
// when direction == backwards
// * current_at_base_ <=> base_iterator < delta_iterator
// always:
// * equal_keys_ <=> base_iterator == delta_iterator
class BaseDeltaIterator : public Iterator {
public:
BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator,
const Comparator* comparator,
const ReadOptions* read_options = nullptr)
: forward_(true),
current_at_base_(true),
equal_keys_(false),
status_(Status::OK()),
base_iterator_(base_iterator),
delta_iterator_(delta_iterator),
comparator_(comparator),
iterate_upper_bound_(read_options ? read_options->iterate_upper_bound
: nullptr) {}
~BaseDeltaIterator() override {}
bool Valid() const override {
return current_at_base_ ? BaseValid() : DeltaValid();
}
void SeekToFirst() override {
forward_ = true;
base_iterator_->SeekToFirst();
delta_iterator_->SeekToFirst();
UpdateCurrent();
}
void SeekToLast() override {
forward_ = false;
base_iterator_->SeekToLast();
delta_iterator_->SeekToLast();
UpdateCurrent();
}
void Seek(const Slice& k) override {
forward_ = true;
base_iterator_->Seek(k);
delta_iterator_->Seek(k);
UpdateCurrent();
}
void SeekForPrev(const Slice& k) override {
forward_ = false;
base_iterator_->SeekForPrev(k);
delta_iterator_->SeekForPrev(k);
UpdateCurrent();
}
void Next() override {
if (!Valid()) {
status_ = Status::NotSupported("Next() on invalid iterator");
return;
}
if (!forward_) {
// Need to change direction
// if our direction was backward and we're not equal, we have two states:
// * both iterators are valid: we're already in a good state (current
// shows to smaller)
// * only one iterator is valid: we need to advance that iterator
forward_ = true;
equal_keys_ = false;
if (!BaseValid()) {
assert(DeltaValid());
base_iterator_->SeekToFirst();
} else if (!DeltaValid()) {
delta_iterator_->SeekToFirst();
} else if (current_at_base_) {
// Change delta from larger than base to smaller
AdvanceDelta();
} else {
// Change base from larger than delta to smaller
AdvanceBase();
}
if (DeltaValid() && BaseValid()) {
if (comparator_->Equal(delta_iterator_->Entry().key,
base_iterator_->key())) {
equal_keys_ = true;
}
}
}
Advance();
}
void Prev() override {
if (!Valid()) {
status_ = Status::NotSupported("Prev() on invalid iterator");
return;
}
if (forward_) {
// Need to change direction
// if our direction was backward and we're not equal, we have two states:
// * both iterators are valid: we're already in a good state (current
// shows to smaller)
// * only one iterator is valid: we need to advance that iterator
forward_ = false;
equal_keys_ = false;
if (!BaseValid()) {
assert(DeltaValid());
base_iterator_->SeekToLast();
} else if (!DeltaValid()) {
delta_iterator_->SeekToLast();
} else if (current_at_base_) {
// Change delta from less advanced than base to more advanced
AdvanceDelta();
} else {
// Change base from less advanced than delta to more advanced
AdvanceBase();
}
if (DeltaValid() && BaseValid()) {
if (comparator_->Equal(delta_iterator_->Entry().key,
base_iterator_->key())) {
equal_keys_ = true;
}
}
}
Advance();
}
Slice key() const override {
return current_at_base_ ? base_iterator_->key()
: delta_iterator_->Entry().key;
}
Slice value() const override {
return current_at_base_ ? base_iterator_->value()
: delta_iterator_->Entry().value;
}
Status status() const override {
if (!status_.ok()) {
return status_;
}
if (!base_iterator_->status().ok()) {
return base_iterator_->status();
}
return delta_iterator_->status();
}
private:
void AssertInvariants() {
#ifndef NDEBUG
bool not_ok = false;
if (!base_iterator_->status().ok()) {
assert(!base_iterator_->Valid());
not_ok = true;
}
if (!delta_iterator_->status().ok()) {
assert(!delta_iterator_->Valid());
not_ok = true;
}
if (not_ok) {
assert(!Valid());
assert(!status().ok());
return;
}
if (!Valid()) {
return;
}
if (!BaseValid()) {
assert(!current_at_base_ && delta_iterator_->Valid());
return;
}
if (!DeltaValid()) {
assert(current_at_base_ && base_iterator_->Valid());
return;
}
// we don't support those yet
assert(delta_iterator_->Entry().type != kMergeRecord &&
delta_iterator_->Entry().type != kLogDataRecord);
int compare = comparator_->Compare(delta_iterator_->Entry().key,
base_iterator_->key());
if (forward_) {
// current_at_base -> compare < 0
assert(!current_at_base_ || compare < 0);
// !current_at_base -> compare <= 0
assert(current_at_base_ && compare >= 0);
} else {
// current_at_base -> compare > 0
assert(!current_at_base_ || compare > 0);
// !current_at_base -> compare <= 0
assert(current_at_base_ && compare <= 0);
}
// equal_keys_ <=> compare == 0
assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
#endif
}
void Advance() {
if (equal_keys_) {
assert(BaseValid() && DeltaValid());
AdvanceBase();
AdvanceDelta();
} else {
if (current_at_base_) {
assert(BaseValid());
AdvanceBase();
} else {
assert(DeltaValid());
AdvanceDelta();
}
}
UpdateCurrent();
}
void AdvanceDelta() {
if (forward_) {
delta_iterator_->Next();
} else {
delta_iterator_->Prev();
}
}
void AdvanceBase() {
if (forward_) {
base_iterator_->Next();
} else {
base_iterator_->Prev();
}
}
bool BaseValid() const { return base_iterator_->Valid(); }
bool DeltaValid() const { return delta_iterator_->Valid(); }
void UpdateCurrent() {
// Suppress false positive clang analyzer warnings.
#ifndef __clang_analyzer__
status_ = Status::OK();
while (true) {
WriteEntry delta_entry;
if (DeltaValid()) {
assert(delta_iterator_->status().ok());
delta_entry = delta_iterator_->Entry();
} else if (!delta_iterator_->status().ok()) {
// Expose the error status and stop.
current_at_base_ = false;
return;
}
equal_keys_ = false;
if (!BaseValid()) {
if (!base_iterator_->status().ok()) {
// Expose the error status and stop.
current_at_base_ = true;
return;
}
// Base has finished.
if (!DeltaValid()) {
// Finished
return;
}
if (iterate_upper_bound_) {
if (comparator_->Compare(delta_entry.key, *iterate_upper_bound_) >=
0) {
// out of upper bound -> finished.
return;
}
}
if (delta_entry.type == kDeleteRecord ||
delta_entry.type == kSingleDeleteRecord) {
AdvanceDelta();
} else {
current_at_base_ = false;
return;
}
} else if (!DeltaValid()) {
// Delta has finished.
current_at_base_ = true;
return;
} else {
int compare =
(forward_ ? 1 : -1) *
comparator_->Compare(delta_entry.key, base_iterator_->key());
if (compare <= 0) { // delta bigger or equal
if (compare == 0) {
equal_keys_ = true;
}
if (delta_entry.type != kDeleteRecord &&
delta_entry.type != kSingleDeleteRecord) {
current_at_base_ = false;
return;
}
// Delta is less advanced and is delete.
AdvanceDelta();
if (equal_keys_) {
AdvanceBase();
}
} else {
current_at_base_ = true;
return;
}
}
}
AssertInvariants();
#endif // __clang_analyzer__
}
bool forward_;
bool current_at_base_;
bool equal_keys_;
Status status_;
std::unique_ptr<Iterator> base_iterator_;
std::unique_ptr<WBWIIterator> delta_iterator_;
const Comparator* comparator_; // not owned
const Slice* iterate_upper_bound_;
};
typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
WriteBatchEntrySkipList;

@ -23,6 +23,324 @@ namespace ROCKSDB_NAMESPACE {
class MergeContext;
struct Options;
// when direction == forward
// * current_at_base_ <=> base_iterator > delta_iterator
// when direction == backwards
// * current_at_base_ <=> base_iterator < delta_iterator
// always:
// * equal_keys_ <=> base_iterator == delta_iterator
class BaseDeltaIterator : public Iterator {
public:
BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator,
const Comparator* comparator,
const ReadOptions* read_options = nullptr)
: forward_(true),
current_at_base_(true),
equal_keys_(false),
status_(Status::OK()),
base_iterator_(base_iterator),
delta_iterator_(delta_iterator),
comparator_(comparator),
iterate_upper_bound_(read_options ? read_options->iterate_upper_bound
: nullptr) {}
~BaseDeltaIterator() override {}
bool Valid() const override {
return status_.ok() ? (current_at_base_ ? BaseValid() : DeltaValid())
: false;
}
void SeekToFirst() override {
forward_ = true;
base_iterator_->SeekToFirst();
delta_iterator_->SeekToFirst();
UpdateCurrent();
}
void SeekToLast() override {
forward_ = false;
base_iterator_->SeekToLast();
delta_iterator_->SeekToLast();
UpdateCurrent();
}
void Seek(const Slice& k) override {
forward_ = true;
base_iterator_->Seek(k);
delta_iterator_->Seek(k);
UpdateCurrent();
}
void SeekForPrev(const Slice& k) override {
forward_ = false;
base_iterator_->SeekForPrev(k);
delta_iterator_->SeekForPrev(k);
UpdateCurrent();
}
void Next() override {
if (!Valid()) {
status_ = Status::NotSupported("Next() on invalid iterator");
return;
}
if (!forward_) {
// Need to change direction
// if our direction was backward and we're not equal, we have two states:
// * both iterators are valid: we're already in a good state (current
// shows to smaller)
// * only one iterator is valid: we need to advance that iterator
forward_ = true;
equal_keys_ = false;
if (!BaseValid()) {
assert(DeltaValid());
base_iterator_->SeekToFirst();
} else if (!DeltaValid()) {
delta_iterator_->SeekToFirst();
} else if (current_at_base_) {
// Change delta from larger than base to smaller
AdvanceDelta();
} else {
// Change base from larger than delta to smaller
AdvanceBase();
}
if (DeltaValid() && BaseValid()) {
if (comparator_->Equal(delta_iterator_->Entry().key,
base_iterator_->key())) {
equal_keys_ = true;
}
}
}
Advance();
}
void Prev() override {
if (!Valid()) {
status_ = Status::NotSupported("Prev() on invalid iterator");
return;
}
if (forward_) {
// Need to change direction
// if our direction was backward and we're not equal, we have two states:
// * both iterators are valid: we're already in a good state (current
// shows to smaller)
// * only one iterator is valid: we need to advance that iterator
forward_ = false;
equal_keys_ = false;
if (!BaseValid()) {
assert(DeltaValid());
base_iterator_->SeekToLast();
} else if (!DeltaValid()) {
delta_iterator_->SeekToLast();
} else if (current_at_base_) {
// Change delta from less advanced than base to more advanced
AdvanceDelta();
} else {
// Change base from less advanced than delta to more advanced
AdvanceBase();
}
if (DeltaValid() && BaseValid()) {
if (comparator_->Equal(delta_iterator_->Entry().key,
base_iterator_->key())) {
equal_keys_ = true;
}
}
}
Advance();
}
Slice key() const override {
return current_at_base_ ? base_iterator_->key()
: delta_iterator_->Entry().key;
}
Slice value() const override {
return current_at_base_ ? base_iterator_->value()
: delta_iterator_->Entry().value;
}
Status status() const override {
if (!status_.ok()) {
return status_;
}
if (!base_iterator_->status().ok()) {
return base_iterator_->status();
}
return delta_iterator_->status();
}
void Invalidate(Status s) { status_ = s; }
private:
void AssertInvariants() {
#ifndef NDEBUG
bool not_ok = false;
if (!base_iterator_->status().ok()) {
assert(!base_iterator_->Valid());
not_ok = true;
}
if (!delta_iterator_->status().ok()) {
assert(!delta_iterator_->Valid());
not_ok = true;
}
if (not_ok) {
assert(!Valid());
assert(!status().ok());
return;
}
if (!Valid()) {
return;
}
if (!BaseValid()) {
assert(!current_at_base_ && delta_iterator_->Valid());
return;
}
if (!DeltaValid()) {
assert(current_at_base_ && base_iterator_->Valid());
return;
}
// we don't support those yet
assert(delta_iterator_->Entry().type != kMergeRecord &&
delta_iterator_->Entry().type != kLogDataRecord);
int compare = comparator_->Compare(delta_iterator_->Entry().key,
base_iterator_->key());
if (forward_) {
// current_at_base -> compare < 0
assert(!current_at_base_ || compare < 0);
// !current_at_base -> compare <= 0
assert(current_at_base_ && compare >= 0);
} else {
// current_at_base -> compare > 0
assert(!current_at_base_ || compare > 0);
// !current_at_base -> compare <= 0
assert(current_at_base_ && compare <= 0);
}
// equal_keys_ <=> compare == 0
assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
#endif
}
void Advance() {
if (equal_keys_) {
assert(BaseValid() && DeltaValid());
AdvanceBase();
AdvanceDelta();
} else {
if (current_at_base_) {
assert(BaseValid());
AdvanceBase();
} else {
assert(DeltaValid());
AdvanceDelta();
}
}
UpdateCurrent();
}
void AdvanceDelta() {
if (forward_) {
delta_iterator_->Next();
} else {
delta_iterator_->Prev();
}
}
void AdvanceBase() {
if (forward_) {
base_iterator_->Next();
} else {
base_iterator_->Prev();
}
}
bool BaseValid() const { return base_iterator_->Valid(); }
bool DeltaValid() const { return delta_iterator_->Valid(); }
void UpdateCurrent() {
// Suppress false positive clang analyzer warnings.
#ifndef __clang_analyzer__
status_ = Status::OK();
while (true) {
WriteEntry delta_entry;
if (DeltaValid()) {
assert(delta_iterator_->status().ok());
delta_entry = delta_iterator_->Entry();
} else if (!delta_iterator_->status().ok()) {
// Expose the error status and stop.
current_at_base_ = false;
return;
}
equal_keys_ = false;
if (!BaseValid()) {
if (!base_iterator_->status().ok()) {
// Expose the error status and stop.
current_at_base_ = true;
return;
}
// Base has finished.
if (!DeltaValid()) {
// Finished
return;
}
if (iterate_upper_bound_) {
if (comparator_->Compare(delta_entry.key, *iterate_upper_bound_) >=
0) {
// out of upper bound -> finished.
return;
}
}
if (delta_entry.type == kDeleteRecord ||
delta_entry.type == kSingleDeleteRecord) {
AdvanceDelta();
} else {
current_at_base_ = false;
return;
}
} else if (!DeltaValid()) {
// Delta has finished.
current_at_base_ = true;
return;
} else {
int compare =
(forward_ ? 1 : -1) *
comparator_->Compare(delta_entry.key, base_iterator_->key());
if (compare <= 0) { // delta bigger or equal
if (compare == 0) {
equal_keys_ = true;
}
if (delta_entry.type != kDeleteRecord &&
delta_entry.type != kSingleDeleteRecord) {
current_at_base_ = false;
return;
}
// Delta is less advanced and is delete.
AdvanceDelta();
if (equal_keys_) {
AdvanceBase();
}
} else {
current_at_base_ = true;
return;
}
}
}
AssertInvariants();
#endif // __clang_analyzer__
}
bool forward_;
bool current_at_base_;
bool equal_keys_;
Status status_;
std::unique_ptr<Iterator> base_iterator_;
std::unique_ptr<WBWIIterator> delta_iterator_;
const Comparator* comparator_; // not owned
const Slice* iterate_upper_bound_;
};
// Key used by skip list, as the binary searchable index of WriteBatchWithIndex.
struct WriteBatchIndexEntry {
WriteBatchIndexEntry(size_t o, uint32_t c, size_t ko, size_t ksz)

Loading…
Cancel
Save