WriteUnPrepared: improve read your own write functionality (#5573)

Summary:
There are a number of fixes in this PR (with most bugs found via the added stress tests):
1. Re-enable reseek optimization. This was initially disabled to avoid infinite loops in https://github.com/facebook/rocksdb/pull/3955 but this can be resolved by remembering not to reseek after a reseek has already been done. This problem only affects forward iteration in `DBIter::FindNextUserEntryInternal`, as we already disable reseeking in `DBIter::FindValueForCurrentKeyUsingSeek`.
2. Verify that ReadOption.snapshot can be safely used for iterator creation. Some snapshots would not give correct results because snaphsot validation would not be enforced, breaking some assumptions in Prev() iteration.
3. In the non-snapshot Get() case, reads done at `LastPublishedSequence` may not be enough, because unprepared sequence numbers are not published. Use `std::max(published_seq, max_visible_seq)` to do lookups instead.
4. Add stress test to test reading own writes.
5. Minor bug in the allow_concurrent_memtable_write case where we forgot to pass in batch_per_txn_.
6. Minor performance optimization in `CalcMaxUnpreparedSequenceNumber` by assigning by reference instead of value.
7. Add some more comments everywhere.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5573

Differential Revision: D16276089

Pulled By: lth

fbshipit-source-id: 18029c944eb427a90a87dee76ac1b23f37ec1ccb
main
Manuel Ung 5 years ago committed by Facebook Github Bot
parent 327c4807a7
commit eae832740b
  1. 15
      db/db_impl/db_impl.cc
  2. 3
      db/db_impl/db_impl_write.cc
  3. 33
      db/db_iter.cc
  4. 3
      db/read_callback.h
  5. 16
      utilities/transactions/transaction_test.cc
  6. 345
      utilities/transactions/write_unprepared_transaction_test.cc
  7. 80
      utilities/transactions/write_unprepared_txn.cc
  8. 57
      utilities/transactions/write_unprepared_txn.h
  9. 82
      utilities/transactions/write_unprepared_txn_db.cc

@ -1499,7 +1499,22 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
? versions_->LastSequence()
: versions_->LastPublishedSequence();
if (callback) {
// The unprep_seqs are not published for write unprepared, so it could be
// that max_visible_seq is larger. Seek to the std::max of the two.
// However, we still want our callback to contain the actual snapshot so
// that it can do the correct visibility filtering.
callback->Refresh(snapshot);
// Internally, WriteUnpreparedTxnReadCallback::Refresh would set
// max_visible_seq = max(max_visible_seq, snapshot)
//
// Currently, the commented out assert is broken by
// InvalidSnapshotReadCallback, but if write unprepared recovery followed
// the regular transaction flow, then this special read callback would not
// be needed.
//
// assert(callback->max_visible_seq() >= snapshot);
snapshot = callback->max_visible_seq();
}
}
TEST_SYNC_POINT("DBImpl::GetImpl:3");

@ -172,7 +172,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt);
true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt,
batch_per_txn_);
PERF_TIMER_START(write_pre_and_post_process_time);
}

@ -263,12 +263,6 @@ class DBIter final: public Iterator {
bool TooManyInternalKeysSkipped(bool increment = true);
inline bool IsVisible(SequenceNumber sequence);
// CanReseekToSkip() returns whether the iterator can use the optimization
// where it reseek by sequence number to get the next key when there are too
// many versions. This is disabled for write unprepared because seeking to
// sequence number does not guarantee that it is visible.
inline bool CanReseekToSkip();
// Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
// is called
void TempPinData() {
@ -453,6 +447,11 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check)
// greater than that,
// - none of the above : saved_key_ can contain anything, it doesn't matter.
uint64_t num_skipped = 0;
// For write unprepared, the target sequence number in reseek could be larger
// than the snapshot, and thus needs to be skipped again. This could result in
// an infinite loop of reseeks. To avoid that, we limit the number of reseeks
// to one.
bool reseek_done = false;
is_blob_ = false;
@ -498,6 +497,7 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check)
assert(!skipping || user_comparator_.Compare(
ikey_.user_key, saved_key_.GetUserKey()) > 0);
num_skipped = 0;
reseek_done = false;
switch (ikey_.type) {
case kTypeDeletion:
case kTypeSingleDeletion:
@ -551,6 +551,7 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check)
// they are hidden by this deletion.
skipping = true;
num_skipped = 0;
reseek_done = false;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else if (ikey_.type == kTypeBlobIndex) {
if (!allow_blob_) {
@ -581,6 +582,7 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check)
// they are hidden by this deletion.
skipping = true;
num_skipped = 0;
reseek_done = false;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else {
// By now, we are sure the current ikey is going to yield a
@ -611,14 +613,23 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check)
!iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
skipping = false;
num_skipped = 0;
reseek_done = false;
}
}
// If we have sequentially iterated via numerous equal keys, then it's
// better to seek so that we can avoid too many key comparisons.
if (num_skipped > max_skip_ && CanReseekToSkip()) {
//
// To avoid infinite loops, do not reseek if we have already attempted to
// reseek previously.
//
// TODO(lth): If we reseek to sequence number greater than ikey_.sequence,
// than it does not make sense to reseek as we would actually land further
// away from the desired key. There is opportunity for optimization here.
if (num_skipped > max_skip_ && !reseek_done) {
is_key_seqnum_zero_ = false;
num_skipped = 0;
reseek_done = true;
std::string last_key;
if (skipping) {
// We're looking for the next user-key but all we see are the same
@ -937,7 +948,7 @@ bool DBIter::FindValueForCurrentKey() {
// This user key has lots of entries.
// We're going from old to new, and it's taking too long. Let's do a Seek()
// and go from new to old. This helps when a key was overwritten many times.
if (num_skipped >= max_skip_ && CanReseekToSkip()) {
if (num_skipped >= max_skip_) {
return FindValueForCurrentKeyUsingSeek();
}
@ -1234,7 +1245,7 @@ bool DBIter::FindUserKeyBeforeSavedKey() {
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
}
if (num_skipped >= max_skip_ && CanReseekToSkip()) {
if (num_skipped >= max_skip_) {
num_skipped = 0;
IterKey last_key;
last_key.SetInternalKey(ParsedInternalKey(
@ -1281,10 +1292,6 @@ bool DBIter::IsVisible(SequenceNumber sequence) {
}
}
bool DBIter::CanReseekToSkip() {
return read_callback_ == nullptr || read_callback_->CanReseekToSkip();
}
void DBIter::Seek(const Slice& target) {
PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
StopWatch sw(env_, statistics_, DB_SEEK);

@ -42,9 +42,6 @@ class ReadCallback {
// Refresh to a more recent visible seq
virtual void Refresh(SequenceNumber seq) { max_visible_seq_ = seq; }
// Refer to DBIter::CanReseekToSkip
virtual bool CanReseekToSkip() { return true; }
protected:
// The max visible seq, it is usually the snapshot but could be larger if
// transaction has its own writes written to db.

@ -3471,6 +3471,12 @@ TEST_P(TransactionTest, LockLimitTest) {
}
TEST_P(TransactionTest, IteratorTest) {
// This test does writes without snapshot validation, and then tries to create
// iterator later, which is unsupported in write unprepared.
if (txn_db_options.write_policy == WRITE_UNPREPARED) {
return;
}
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
std::string value;
@ -3589,6 +3595,16 @@ TEST_P(TransactionTest, IteratorTest) {
}
TEST_P(TransactionTest, DisableIndexingTest) {
// Skip this test for write unprepared. It does not solely rely on WBWI for
// read your own writes, so depending on whether batches are flushed or not,
// only some writes will be visible.
//
// Also, write unprepared does not support creating iterators if there has
// been txn->Put() without snapshot validation.
if (txn_db_options.write_policy == WRITE_UNPREPARED) {
return;
}
WriteOptions write_options;
ReadOptions read_options;
std::string value;

@ -37,6 +37,9 @@ INSTANTIATE_TEST_CASE_P(
std::make_tuple(false, true, WRITE_UNPREPARED)));
TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
// The following tests checks whether reading your own write for
// a transaction works for write unprepared, when there are uncommitted
// values written into DB.
auto verify_state = [](Iterator* iter, const std::string& key,
const std::string& value) {
ASSERT_TRUE(iter->Valid());
@ -45,155 +48,251 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
ASSERT_EQ(value, iter->value().ToString());
};
options.disable_auto_compactions = true;
ReOpen();
// The following tests checks whether reading your own write for
// a transaction works for write unprepared, when there are uncommitted
// values written into DB.
//
// Although the values written by DB::Put are technically committed, we add
// their seq num to unprep_seqs_ to pretend that they were written into DB
// as part of an unprepared batch, and then check if they are visible to the
// transaction.
auto snapshot0 = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "a", "v1"));
ASSERT_OK(db->Put(WriteOptions(), "b", "v2"));
auto snapshot2 = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "a", "v3"));
ASSERT_OK(db->Put(WriteOptions(), "b", "v4"));
auto snapshot4 = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "a", "v5"));
ASSERT_OK(db->Put(WriteOptions(), "b", "v6"));
auto snapshot6 = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "a", "v7"));
ASSERT_OK(db->Put(WriteOptions(), "b", "v8"));
auto snapshot8 = db->GetSnapshot();
TransactionOptions txn_options;
WriteOptions write_options;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
ReadOptions roptions;
roptions.snapshot = snapshot0;
wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
auto iter = txn->GetIterator(roptions);
// Test always reseeking vs never reseeking.
for (uint64_t max_skip : {0, std::numeric_limits<int>::max()}) {
options.max_sequential_skip_in_iterations = max_skip;
options.disable_auto_compactions = true;
ReOpen();
// Test Get().
std::string value;
TransactionOptions txn_options;
WriteOptions woptions;
ReadOptions roptions;
ASSERT_OK(txn->Get(roptions, Slice("a"), &value));
ASSERT_EQ(value, "v3");
ASSERT_OK(db->Put(woptions, "a", ""));
ASSERT_OK(db->Put(woptions, "b", ""));
ASSERT_OK(txn->Get(roptions, Slice("b"), &value));
ASSERT_EQ(value, "v4");
Transaction* txn = db->BeginTransaction(woptions, txn_options);
WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
txn->SetSnapshot();
wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
delete iter;
iter = txn->GetIterator(roptions);
for (int i = 0; i < 5; i++) {
std::string stored_value = "v" + ToString(i);
ASSERT_OK(txn->Put("a", stored_value));
ASSERT_OK(txn->Put("b", stored_value));
wup_txn->FlushWriteBatchToDB(false);
ASSERT_OK(txn->Get(roptions, Slice("a"), &value));
ASSERT_EQ(value, "v7");
// Test Get()
std::string value;
ASSERT_OK(txn->Get(roptions, "a", &value));
ASSERT_EQ(value, stored_value);
ASSERT_OK(txn->Get(roptions, "b", &value));
ASSERT_EQ(value, stored_value);
ASSERT_OK(txn->Get(roptions, Slice("b"), &value));
ASSERT_EQ(value, "v8");
// Test Next()
auto iter = txn->GetIterator(roptions);
iter->Seek("a");
verify_state(iter, "a", stored_value);
wup_txn->unprep_seqs_.clear();
iter->Next();
verify_state(iter, "b", stored_value);
// Test Next().
wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
delete iter;
iter = txn->GetIterator(roptions);
iter->SeekToFirst();
verify_state(iter, "a", stored_value);
iter->Seek("a");
verify_state(iter, "a", "v3");
iter->Next();
verify_state(iter, "b", stored_value);
iter->Next();
verify_state(iter, "b", "v4");
delete iter;
iter->SeekToFirst();
verify_state(iter, "a", "v3");
// Test Prev()
iter = txn->GetIterator(roptions);
iter->SeekForPrev("b");
verify_state(iter, "b", stored_value);
iter->Next();
verify_state(iter, "b", "v4");
iter->Prev();
verify_state(iter, "a", stored_value);
wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
delete iter;
iter = txn->GetIterator(roptions);
iter->SeekToLast();
verify_state(iter, "b", stored_value);
iter->Seek("a");
verify_state(iter, "a", "v7");
iter->Prev();
verify_state(iter, "a", stored_value);
iter->Next();
verify_state(iter, "b", "v8");
iter->SeekToFirst();
verify_state(iter, "a", "v7");
iter->Next();
verify_state(iter, "b", "v8");
wup_txn->unprep_seqs_.clear();
// Test Prev(). For Prev(), we need to adjust the snapshot to match what is
// possible in WriteUnpreparedTxn.
//
// Because of row locks and ValidateSnapshot, there cannot be any committed
// entries after snapshot, but before the first prepared key.
roptions.snapshot = snapshot2;
wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
delete iter;
iter = txn->GetIterator(roptions);
delete iter;
}
iter->SeekForPrev("b");
verify_state(iter, "b", "v4");
delete txn;
}
}
iter->Prev();
verify_state(iter, "a", "v3");
TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWriteStress) {
// This is a stress test where different threads are writing random keys, and
// then before committing or aborting the transaction, it validates to see
// that it can read the keys it wrote, and the keys it did not write respect
// the snapshot. To avoid row lock contention (and simply stressing the
// locking system), each thread is mostly only writing to its own set of keys.
const uint32_t kNumIter = 1000;
const uint32_t kNumThreads = 10;
const uint32_t kNumKeys = 5;
std::default_random_engine rand(static_cast<uint32_t>(
std::hash<std::thread::id>()(std::this_thread::get_id())));
enum Action { NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT };
// Test with
// 1. no snapshots set
// 2. snapshot set on ReadOptions
// 3. snapshot set, and refreshing after every write.
for (Action a : {NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT}) {
WriteOptions write_options;
txn_db_options.transaction_lock_timeout = -1;
options.disable_auto_compactions = true;
ReOpen();
std::vector<std::string> keys;
for (uint32_t k = 0; k < kNumKeys * kNumThreads; k++) {
keys.push_back("k" + ToString(k));
}
std::shuffle(keys.begin(), keys.end(), rand);
// This counter will act as a "sequence number" to help us validate
// visibility logic with snapshots. If we had direct access to the seqno of
// snapshots and key/values, then we should directly compare those instead.
std::atomic<int64_t> counter(0);
std::function<void(uint32_t)> stress_thread = [&](int id) {
size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
Random64 rnd(static_cast<uint32_t>(tid));
Transaction* txn;
TransactionOptions txn_options;
// batch_size of 1 causes writes to DB for every marker.
txn_options.max_write_batch_size = 1;
ReadOptions read_options;
for (uint32_t i = 0; i < kNumIter; i++) {
std::set<std::string> owned_keys(&keys[id * kNumKeys],
&keys[(id + 1) * kNumKeys]);
// Add unowned keys to make the workload more interesting, but this
// increases row lock contention, so just do it sometimes.
if (rnd.OneIn(2)) {
owned_keys.insert(keys[rnd.Uniform(kNumKeys * kNumThreads)]);
}
iter->SeekToLast();
verify_state(iter, "b", "v4");
txn = db->BeginTransaction(write_options, txn_options);
txn->SetName(ToString(id));
txn->SetSnapshot();
if (a >= RO_SNAPSHOT) {
read_options.snapshot = txn->GetSnapshot();
ASSERT_TRUE(read_options.snapshot != nullptr);
}
iter->Prev();
verify_state(iter, "a", "v3");
uint64_t buf[2];
buf[0] = id;
roptions.snapshot = snapshot6;
wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
delete iter;
iter = txn->GetIterator(roptions);
// When scanning through the database, make sure that all unprepared
// keys have value >= snapshot and all other keys have value < snapshot.
int64_t snapshot_num = counter.fetch_add(1);
iter->SeekForPrev("b");
verify_state(iter, "b", "v8");
Status s;
for (const auto& key : owned_keys) {
buf[1] = counter.fetch_add(1);
s = txn->Put(key, Slice((const char*)buf, sizeof(buf)));
if (!s.ok()) {
break;
}
if (a == REFRESH_SNAPSHOT) {
txn->SetSnapshot();
read_options.snapshot = txn->GetSnapshot();
snapshot_num = counter.fetch_add(1);
}
}
iter->Prev();
verify_state(iter, "a", "v7");
// Failure is possible due to snapshot validation. In this case,
// rollback and move onto next iteration.
if (!s.ok()) {
ASSERT_TRUE(s.IsBusy());
ASSERT_OK(txn->Rollback());
delete txn;
continue;
}
iter->SeekToLast();
verify_state(iter, "b", "v8");
auto verify_key = [&owned_keys, &a, &id, &snapshot_num](
const std::string& key,
const std::string& value) {
if (owned_keys.count(key) > 0) {
ASSERT_EQ(value.size(), 16);
// Since this key is part of owned_keys, then this key must be
// unprepared by this transaction identified by 'id'
ASSERT_EQ(((int64_t*)value.c_str())[0], id);
if (a == REFRESH_SNAPSHOT) {
// If refresh snapshot is true, then the snapshot is refreshed
// after every Put(), meaning that the current snapshot in
// snapshot_num must be greater than the "seqno" of any keys
// written by the current transaction.
ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num);
} else {
// If refresh snapshot is not on, then the snapshot was taken at
// the beginning of the transaction, meaning all writes must come
// after snapshot_num
ASSERT_GT(((int64_t*)value.c_str())[1], snapshot_num);
}
} else if (a >= RO_SNAPSHOT) {
// If this is not an unprepared key, just assert that the key
// "seqno" is smaller than the snapshot seqno.
ASSERT_EQ(value.size(), 16);
ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num);
}
};
// Validate Get()/Next()/Prev(). Do only one of them to save time, and
// reduce lock contention.
switch (rnd.Uniform(3)) {
case 0: // Validate Get()
{
for (const auto& key : keys) {
std::string value;
s = txn->Get(read_options, Slice(key), &value);
if (!s.ok()) {
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ(owned_keys.count(key), 0);
} else {
verify_key(key, value);
}
}
break;
}
case 1: // Validate Next()
{
Iterator* iter = txn->GetIterator(read_options);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
verify_key(iter->key().ToString(), iter->value().ToString());
}
delete iter;
break;
}
case 2: // Validate Prev()
{
Iterator* iter = txn->GetIterator(read_options);
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
verify_key(iter->key().ToString(), iter->value().ToString());
}
delete iter;
break;
}
default:
ASSERT_TRUE(false);
}
iter->Prev();
verify_state(iter, "a", "v7");
if (rnd.OneIn(2)) {
ASSERT_OK(txn->Commit());
} else {
ASSERT_OK(txn->Rollback());
}
delete txn;
}
};
// Since the unprep_seqs_ data were faked for testing, we do not want the
// destructor for the transaction to be rolling back data that did not
// exist.
wup_txn->unprep_seqs_.clear();
std::vector<port::Thread> threads;
for (uint32_t i = 0; i < kNumThreads; i++) {
threads.emplace_back(stress_thread, i);
}
db->ReleaseSnapshot(snapshot0);
db->ReleaseSnapshot(snapshot2);
db->ReleaseSnapshot(snapshot4);
db->ReleaseSnapshot(snapshot6);
db->ReleaseSnapshot(snapshot8);
delete iter;
delete txn;
for (auto& t : threads) {
t.join();
}
}
}
// This tests how write unprepared behaves during recovery when the DB crashes

@ -32,7 +32,7 @@ bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) {
SequenceNumber WriteUnpreparedTxnReadCallback::CalcMaxUnpreparedSequenceNumber(
WriteUnpreparedTxn* txn) {
auto unprep_seqs = txn->GetUnpreparedSequenceNumbers();
const auto& unprep_seqs = txn->GetUnpreparedSequenceNumbers();
if (unprep_seqs.size()) {
return unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1;
}
@ -44,7 +44,8 @@ WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
const TransactionOptions& txn_options)
: WritePreparedTxn(txn_db, write_options, txn_options),
wupt_db_(txn_db),
recovered_txn_(false) {
recovered_txn_(false),
largest_validated_seq_(0) {
max_write_batch_size_ = txn_options.max_write_batch_size;
// We set max bytes to zero so that we don't get a memory limit error.
// Instead of trying to keep write batch strictly under the size limit, we
@ -85,75 +86,82 @@ void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
write_batch_.SetMaxBytes(0);
unprep_seqs_.clear();
recovered_txn_ = false;
largest_validated_seq_ = 0;
}
Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value,
const bool assume_tracked) {
Status WriteUnpreparedTxn::HandleWrite(std::function<Status()> do_write) {
Status s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
s = do_write();
if (s.ok()) {
if (snapshot_) {
largest_validated_seq_ =
std::max(largest_validated_seq_, snapshot_->GetSequenceNumber());
} else {
largest_validated_seq_ = kMaxSequenceNumber;
}
}
return s;
}
Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value,
const bool assume_tracked) {
return HandleWrite([&]() {
return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
});
}
Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
const SliceParts& key, const SliceParts& value,
const bool assume_tracked) {
Status s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
return HandleWrite([&]() {
return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
});
}
Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value,
const bool assume_tracked) {
Status s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
return TransactionBaseImpl::Merge(column_family, key, value, assume_tracked);
return HandleWrite([&]() {
return TransactionBaseImpl::Merge(column_family, key, value,
assume_tracked);
});
}
Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
const Slice& key, const bool assume_tracked) {
Status s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
return HandleWrite([&]() {
return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
});
}
Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key,
const bool assume_tracked) {
Status s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
return HandleWrite([&]() {
return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
});
}
Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key,
const bool assume_tracked) {
Status s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked);
return HandleWrite([&]() {
return TransactionBaseImpl::SingleDelete(column_family, key,
assume_tracked);
});
}
Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key,
const bool assume_tracked) {
Status s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked);
return HandleWrite([&]() {
return TransactionBaseImpl::SingleDelete(column_family, key,
assume_tracked);
});
}
// WriteUnpreparedTxn::RebuildFromWriteBatch is only called on recovery. For

@ -17,6 +17,40 @@ namespace rocksdb {
class WriteUnpreparedTxnDB;
class WriteUnpreparedTxn;
// WriteUnprepared transactions needs to be able to read their own uncommitted
// writes, and supporting this requires some careful consideration. Because
// writes in the current transaction may be flushed to DB already, we cannot
// rely on the contents of WriteBatchWithIndex to determine whether a key should
// be visible or not, so we have to remember to check the DB for any uncommitted
// keys that should be visible to us. First, we will need to change the seek to
// snapshot logic, to seek to max_visible_seq = max(snap_seq, max_unprep_seq).
// Any key greater than max_visible_seq should not be visible because they
// cannot be unprepared by the current transaction and they are not in its
// snapshot.
//
// When we seek to max_visible_seq, one of these cases will happen:
// 1. We hit a unprepared key from the current transaction.
// 2. We hit a unprepared key from the another transaction.
// 3. We hit a committed key with snap_seq < seq < max_unprep_seq.
// 4. We hit a committed key with seq <= snap_seq.
//
// IsVisibleFullCheck handles all cases correctly.
//
// Other notes:
// Note that max_visible_seq is only calculated once at iterator construction
// time, meaning if the same transaction is adding more unprep seqs through
// writes during iteration, these newer writes may not be visible. This is not a
// problem for MySQL though because it avoids modifying the index as it is
// scanning through it to avoid the Halloween Problem. Instead, it scans the
// index once up front, and modifies based on a temporary copy.
//
// In DBIter, there is a "reseek" optimization if the iterator skips over too
// many keys. However, this assumes that the reseek seeks exactly to the
// required key. In write unprepared, even after seeking directly to
// max_visible_seq, some iteration may be required before hitting a visible key,
// and special precautions must be taken to avoid performing another reseek,
// leading to an infinite loop.
//
class WriteUnpreparedTxnReadCallback : public ReadCallback {
public:
WriteUnpreparedTxnReadCallback(WritePreparedTxnDB* db,
@ -25,7 +59,7 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback {
WriteUnpreparedTxn* txn)
// Pass our last uncommitted seq as the snapshot to the parent class to
// ensure that the parent will not prematurely filter out own writes. We
// will do the exact comparison agaisnt snapshots in IsVisibleFullCheck
// will do the exact comparison against snapshots in IsVisibleFullCheck
// override.
: ReadCallback(CalcMaxVisibleSeq(txn, snapshot), min_uncommitted),
db_(db),
@ -34,12 +68,6 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback {
virtual bool IsVisibleFullCheck(SequenceNumber seq) override;
bool CanReseekToSkip() override {
return wup_snapshot_ == max_visible_seq_;
// Otherwise our own writes uncommitted are in db, and the assumptions
// behind reseek optimizations are no longer valid.
}
void Refresh(SequenceNumber seq) override {
max_visible_seq_ = std::max(max_visible_seq_, seq);
wup_snapshot_ = seq;
@ -130,6 +158,7 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
Status MaybeFlushWriteBatchToDB();
Status FlushWriteBatchToDB(bool prepared);
Status HandleWrite(std::function<Status()> do_write);
// For write unprepared, we check on every writebatch append to see if
// max_write_batch_size_ has been exceeded, and then call
@ -153,6 +182,20 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
// locked for efficiency reasons. For recovered transactions, skip unlocking
// keys when transaction ends.
bool recovered_txn_;
// Track the largest sequence number at which we performed snapshot
// validation. If snapshot validation was skipped because no snapshot was set,
// then this is set to kMaxSequenceNumber. This value is useful because it
// means that for keys that have unprepared seqnos, we can guarantee that no
// committed keys by other transactions can exist between
// largest_validated_seq_ and max_unprep_seq. See
// WriteUnpreparedTxnDB::NewIterator for an explanation for why this is
// necessary for iterator Prev().
//
// Currently this value only increases during the lifetime of a transaction,
// but in some cases, we should be able to restore the previously largest
// value when calling RollbackToSavepoint.
SequenceNumber largest_validated_seq_;
};
} // namespace rocksdb

@ -368,25 +368,77 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
constexpr bool ALLOW_BLOB = true;
constexpr bool ALLOW_REFRESH = true;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0;
if (options.snapshot != nullptr) {
snapshot_seq = options.snapshot->GetSequenceNumber();
min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(
options.snapshot)
->min_uncommitted_;
} else {
auto* snapshot = GetSnapshot();
// We take a snapshot to make sure that the related data in the commit map
// are not deleted.
snapshot_seq = snapshot->GetSequenceNumber();
min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
->min_uncommitted_;
// Currently, the Prev() iterator logic does not work well without snapshot
// validation. The logic simply iterates through values of a key in
// ascending seqno order, stopping at the first non-visible value and
// returning the last visible value.
//
// For example, if snapshot sequence is 3, and we have the following keys:
// foo: v1 1
// foo: v2 2
// foo: v3 3
// foo: v4 4
// foo: v5 5
//
// Then 1, 2, 3 will be visible, but 4 will be non-visible, so we return v3,
// which is the last visible key.
//
// For unprepared transactions, if we have snap_seq = 3, but the current
// transaction has unprep_seq 5, then returning the first non-visible key
// would be incorrect, as we should return v5, and not v3. The problem is that
// there are committed keys at snapshot_seq < commit_seq < unprep_seq.
//
// Snapshot validation can prevent this problem by ensuring that no committed
// keys exist at snapshot_seq < commit_seq, and thus any value with a sequence
// number greater than snapshot_seq must be unprepared keys. For example, if
// the transaction had a snapshot at 3, then snapshot validation would be
// performed during the Put(v5) call. It would find v4, and the Put would fail
// with snapshot validation failure.
//
// Because of this, if any writes have occurred, then the transaction snapshot
// must be used for the iterator. If no writes have occurred though, we can
// simply create a snapshot. Later writes would not be visible though, but we
// don't support iterating while writing anyway.
//
// TODO(lth): Improve Prev() logic to continue iterating until
// max_visible_seq, and then return the last visible key, so that this
// restriction can be lifted.
const Snapshot* snapshot = nullptr;
if (options.snapshot == nullptr) {
snapshot = GetSnapshot();
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
} else {
snapshot = options.snapshot;
}
snapshot_seq = snapshot->GetSequenceNumber();
assert(snapshot_seq != kMaxSequenceNumber);
// Iteration is safe as long as largest_validated_seq <= snapshot_seq. We are
// guaranteed that for keys that were modified by this transaction (and thus
// might have unprepared versions), no committed versions exist at
// largest_validated_seq < commit_seq (or the contrapositive: any committed
// version must exist at commit_seq <= largest_validated_seq). This implies
// that commit_seq <= largest_validated_seq <= snapshot_seq or commit_seq <=
// snapshot_seq. As explained above, the problem with Prev() only happens when
// snapshot_seq < commit_seq.
//
// For keys that were not modified by this transaction, largest_validated_seq_
// is meaningless, and Prev() should just work with the existing visibility
// logic.
if (txn->largest_validated_seq_ > snapshot->GetSequenceNumber() &&
!txn->unprep_seqs_.empty()) {
ROCKS_LOG_ERROR(info_log_,
"WriteUnprepared iterator creation failed since the "
"transaction has performed unvalidated writes");
return nullptr;
}
min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
->min_uncommitted_;
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
auto* state =
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);

Loading…
Cancel
Save