WritePrepared: fix race condition in reading batch with duplicate keys (#5147)

Summary:
When ReadOption doesn't specify a snapshot, WritePrepared::Get used kMaxSequenceNumber to avoid the cost of creating a new snapshot object (that requires sync over db_mutex). This creates a race condition if it is reading from the writes of a transaction that had duplicate keys: each instance of duplicate key is inserted with a different sequence number and depending on the ordering the ::Get might skip the newer one and read the older one that is obsolete.
The patch fixes that by using last published seq as the snapshot sequence number. It also adds a check after the read is done to ensure that the max_evicted_seq has not advanced the aforementioned seq, which is a very unlikely event. If it did, then the read is not valid since the seq is not backed by an actually snapshot to let IsInSnapshot handle that properly when an overlapping commit is evicted from commit cache.
A unit  test is added to reproduce the race condition with duplicate keys.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5147

Differential Revision: D14758815

Pulled By: maysamyabandeh

fbshipit-source-id: a56915657132cf6ba5e3f5ea1b5d78c803407719
main
Maysam Yabandeh 6 years ago committed by Facebook Github Bot
parent 1966a7c055
commit fe642cbee6
  1. 1
      HISTORY.md
  2. 34
      db/db_impl.cc
  3. 1
      db/read_callback.h
  4. 29
      utilities/transactions/transaction_test.cc
  5. 20
      utilities/transactions/write_prepared_transaction_test.cc
  6. 26
      utilities/transactions/write_prepared_txn.cc
  7. 27
      utilities/transactions/write_prepared_txn_db.cc
  8. 49
      utilities/transactions/write_prepared_txn_db.h
  9. 23
      utilities/transactions/write_unprepared_txn.cc
  10. 8
      utilities/transactions/write_unprepared_txn.h
  11. 43
      utilities/transactions/write_unprepared_txn_db.cc

@ -11,6 +11,7 @@
### Bug Fixes
* Fix a bug in 2PC where a sequence of txn prepare, memtable flush, and crash could result in losing the prepared transaction.
* Fix a bug in Encryption Env which could cause encrypted files to be read beyond file boundaries.
* Fix a race condition between WritePrepared::Get and ::Put with duplicate keys.
## 6.1.0 (3/27/2019)
### New Features

@ -1384,33 +1384,25 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
// Note: In WritePrepared txns this is not necessary but not harmful
// either. Because prep_seq > snapshot => commit_seq > snapshot so if
// a snapshot is specified we should be fine with skipping seq numbers
// that are greater than that.
//
// In WriteUnprepared, we cannot set snapshot in the lookup key because we
// may skip uncommitted data that should be visible to the transaction for
// reading own writes.
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
if (callback) {
snapshot = std::max(snapshot, callback->max_visible_seq());
// Already calculated based on read_options.snapshot
snapshot = callback->max_visible_seq();
} else {
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
}
} else {
// Since we get and reference the super version before getting
// the snapshot number, without a mutex protection, it is possible
// that a memtable switch happened in the middle and not all the
// data for this snapshot is available. But it will contain all
// the data available in the super version we have, which is also
// a valid snapshot to read from.
// We shouldn't get snapshot before finding and referencing the super
// version because a flush happening in between may compact away data for
// the snapshot, but the snapshot is earlier than the data overwriting it,
// so users may see wrong results.
// Note that the snapshot is assigned AFTER referencing the super
// version because otherwise a flush happening in between may compact away
// data for the snapshot, so the reader would see neither data that was be
// visible to the snapshot before compaction nor the newer data inserted
// afterwards.
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
if (callback) {
callback->Refresh(snapshot);
}
}
TEST_SYNC_POINT("DBImpl::GetImpl:3");
TEST_SYNC_POINT("DBImpl::GetImpl:4");

@ -39,6 +39,7 @@ class ReadCallback {
inline SequenceNumber max_visible_seq() { return max_visible_seq_; }
// Refresh to a more recent visible seq
virtual void Refresh(SequenceNumber seq) { max_visible_seq_ = seq; }
// Refer to DBIter::CanReseekToSkip

@ -5331,6 +5331,35 @@ class ThreeBytewiseComparator : public Comparator {
}
};
TEST_P(TransactionTest, GetWithoutSnapshot) {
WriteOptions write_options;
std::atomic<bool> finish = {false};
db->Put(write_options, "key", "value");
rocksdb::port::Thread commit_thread([&]() {
for (int i = 0; i < 100; i++) {
TransactionOptions txn_options;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn->SetName("xid"));
ASSERT_OK(txn->Put("key", "overridedvalue"));
ASSERT_OK(txn->Put("key", "value"));
ASSERT_OK(txn->Prepare());
ASSERT_OK(txn->Commit());
delete txn;
}
finish = true;
});
rocksdb::port::Thread read_thread([&]() {
while (!finish) {
ReadOptions ropt;
PinnableSlice pinnable_val;
ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val));
ASSERT_TRUE(pinnable_val == ("value"));
}
});
commit_thread.join();
read_thread.join();
}
// Test that the transactional db can handle duplicate keys in the write batch
TEST_P(TransactionTest, DuplicateKeys) {
ColumnFamilyOptions cf_options;

@ -1288,6 +1288,13 @@ TEST_P(WritePreparedTransactionTest, TxnInitialize) {
// for example the txn does not add the prepared seq for the second sub-batch to
// the PreparedHeap structure.
TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 1; // disable commit cache
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ReadOptions ropt;
PinnableSlice pinnable_val;
WriteOptions write_options;
TransactionOptions txn_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
@ -1296,24 +1303,19 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) {
ASSERT_OK(txn0->Put(Slice("key"), Slice("value2")));
ASSERT_OK(txn0->Prepare());
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// Ensure that all the prepared sequence numbers will be removed from the
// PreparedHeap.
SequenceNumber new_max = wp_db->COMMIT_CACHE_SIZE;
wp_db->AdvanceMaxEvictedSeq(0, new_max);
ASSERT_OK(db->Put(write_options, "key2", "value"));
// Will cause max advance due to disabled commit cache
ASSERT_OK(db->Put(write_options, "key3", "value"));
ReadOptions ropt;
PinnableSlice pinnable_val;
auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
delete txn0;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
wp_db->db_impl_->FlushWAL(true);
wp_db->TEST_Crash();
ReOpenNoDelete();
assert(db != nullptr);
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
wp_db->AdvanceMaxEvictedSeq(0, new_max);
s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());

@ -44,23 +44,21 @@ void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
prepare_batch_cnt_ = 0;
}
Status WritePreparedTxn::Get(const ReadOptions& read_options,
Status WritePreparedTxn::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val) {
auto snapshot = read_options.snapshot;
auto snap_seq =
snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
SequenceNumber min_uncommitted =
kMinUnCommittedSeq; // by default disable the optimization
if (snapshot != nullptr) {
min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
->min_uncommitted_;
}
SequenceNumber min_uncommitted, snap_seq;
const bool backed_by_snapshot =
wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted);
return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
pinnable_val, &callback);
auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
pinnable_val, &callback);
if (LIKELY(wpt_db_->ValidateSnapshot(callback.max_visible_seq(),
backed_by_snapshot))) {
return res;
} else {
return Status::TryAgain();
}
}
Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {

@ -220,24 +220,19 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
Status WritePreparedTxnDB::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {
// We are fine with the latest committed value. This could be done by
// specifying the snapshot as kMaxSequenceNumber.
SequenceNumber seq = kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0;
if (options.snapshot != nullptr) {
seq = options.snapshot->GetSequenceNumber();
min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>(
options.snapshot)
->min_uncommitted_;
SequenceNumber min_uncommitted, snap_seq;
const bool backed_by_snapshot =
AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted);
bool* dont_care = nullptr;
auto res = db_impl_->GetImpl(options, column_family, key, value, dont_care,
&callback);
if (LIKELY(
ValidateSnapshot(callback.max_visible_seq(), backed_by_snapshot))) {
return res;
} else {
min_uncommitted = SmallestUnCommittedSeq();
return Status::TryAgain();
}
WritePreparedTxnReadCallback callback(this, seq, min_uncommitted);
bool* dont_care = nullptr;
// Note: no need to specify a snapshot for read options as no specific
// snapshot is requested by the user.
return db_impl_->GetImpl(options, column_family, key, value, dont_care,
&callback);
}
void WritePreparedTxnDB::UpdateCFComparatorMap(

@ -25,6 +25,7 @@
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"
#include "util/set_comparator.h"
#include "util/string_util.h"
#include "utilities/transactions/pessimistic_transaction.h"
@ -445,6 +446,20 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
protected:
virtual Status VerifyCFOptions(
const ColumnFamilyOptions& cf_options) override;
// Assign the min and max sequence numbers for reading from the db. A seq >
// max is not valid, and a seq < min is valid, and a min <= seq < max requires
// further checkings. Normally max is defined by the snapshot and min is by
// minimum uncommitted seq.
inline bool AssignMinMaxSeqs(const Snapshot* snapshot, SequenceNumber* min,
SequenceNumber* max);
// Validate is a snapshot sequence number is still valid based on the latest
// db status. backed_by_snapshot specifies if the number is baked by an actual
// snapshot object. order specified the memory order with which we load the
// atomic variables: relax is enough for the default since we care about last
// value seen by same thread.
inline bool ValidateSnapshot(
const SequenceNumber snap_seq, const bool backed_by_snapshot,
std::memory_order order = std::memory_order_relaxed);
private:
friend class PreparedHeap_BasicsTest_Test;
@ -479,6 +494,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test;
friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
friend class WritePreparedTransactionTest_RollbackTest_Test;
friend class WriteUnpreparedTxn;
friend class WriteUnpreparedTxnDB;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
@ -953,5 +969,38 @@ struct SubBatchCounter : public WriteBatch::Handler {
bool WriteAfterCommit() const override { return false; }
};
bool WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot,
SequenceNumber* min,
SequenceNumber* max) {
if (snapshot != nullptr) {
*min = static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
->min_uncommitted_;
*max = static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
->number_;
return true;
} else {
*min = SmallestUnCommittedSeq();
*max = 0; // to be assigned later after sv is referenced.
return false;
}
}
bool WritePreparedTxnDB::ValidateSnapshot(const SequenceNumber snap_seq,
const bool backed_by_snapshot,
std::memory_order order) {
if (backed_by_snapshot) {
return true;
} else {
SequenceNumber max = max_evicted_seq_.load(order);
// Validate that max has not advanced the snapshot seq that is not backed
// by a real snapshot. This is a very rare case that should not happen in
// real workloads.
if (UNLIKELY(snap_seq <= max && snap_seq != 0)) {
return false;
}
}
return true;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -462,21 +462,18 @@ Status WriteUnpreparedTxn::RollbackInternal() {
Status WriteUnpreparedTxn::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {
auto snapshot = options.snapshot;
auto snap_seq =
snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
SequenceNumber min_uncommitted =
kMinUnCommittedSeq; // by default disable the optimization
if (snapshot != nullptr) {
min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
->min_uncommitted_;
}
SequenceNumber min_uncommitted, snap_seq;
const bool backed_by_snapshot =
wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
this);
return write_batch_.GetFromBatchAndDB(db_, options, column_family, key, value,
&callback);
auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
value, &callback);
if (LIKELY(wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
return res;
} else {
return Status::TryAgain();
}
}
Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) {

@ -40,13 +40,15 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback {
// behind reseek optimizations are no longer valid.
}
// TODO(myabandeh): override Refresh when Iterator::Refresh is supported
void Refresh(SequenceNumber seq) override {
max_visible_seq_ = std::max(max_visible_seq_, seq);
wup_snapshot_ = seq;
}
private:
static SequenceNumber CalcMaxVisibleSeq(WriteUnpreparedTxn* txn,
SequenceNumber snapshot_seq) {
SequenceNumber max_unprepared = CalcMaxUnpreparedSequenceNumber(txn);
assert(snapshot_seq < max_unprepared || max_unprepared == 0 ||
snapshot_seq == kMaxSequenceNumber);
return std::max(max_unprepared, snapshot_seq);
}
static SequenceNumber CalcMaxUnpreparedSequenceNumber(

@ -31,23 +31,18 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
class InvalidSnapshotReadCallback : public ReadCallback {
public:
InvalidSnapshotReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
: ReadCallback(snapshot), db_(db) {}
// Will be called to see if the seq number visible; if not it moves on to
// the next seq number.
inline bool IsVisibleFullCheck(SequenceNumber seq) override {
// Becomes true if it cannot tell by comparing seq with snapshot seq since
// the snapshot is not a real snapshot.
auto snapshot = max_visible_seq_;
bool released = false;
auto ret = db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &released);
assert(!released || ret);
return ret;
InvalidSnapshotReadCallback(SequenceNumber snapshot)
: ReadCallback(snapshot) {}
inline bool IsVisibleFullCheck(SequenceNumber) override {
// The seq provided as snapshot is the seq right before we have locked and
// wrote to it, so whatever is there, it is committed.
return true;
}
private:
WritePreparedTxnDB* db_;
// Ignore the refresh request since we are confident that our snapshot seq
// is not going to be affected by concurrent compactions (not enabled yet.)
void Refresh(SequenceNumber) override {}
};
// Iterate starting with largest sequence number.
@ -67,13 +62,12 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
std::map<uint32_t, CFKeys> keys_;
bool rollback_merge_operands_;
RollbackWriteBatchBuilder(
DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
WriteBatch* dst_batch,
DBImpl* db, SequenceNumber snap_seq, WriteBatch* dst_batch,
std::map<uint32_t, const Comparator*>& comparators,
std::map<uint32_t, ColumnFamilyHandle*>& handles,
bool rollback_merge_operands)
: db_(db),
callback(wpt_db, snap_seq),
callback(snap_seq),
// disable min_uncommitted optimization
rollback_batch_(dst_batch),
comparators_(comparators),
@ -149,7 +143,7 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
Status MarkRollback(const Slice&) override {
return Status::InvalidArgument();
}
} rollback_handler(db_impl_, this, last_visible_txn, &rollback_batch,
} rollback_handler(db_impl_, last_visible_txn, &rollback_batch,
*cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
txn_db_options_.rollback_merge_operands);
@ -311,12 +305,7 @@ Status WriteUnpreparedTxnDB::Initialize(
db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
}
// Compaction should start only after max_evicted_seq_ is set.
Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
if (!s.ok()) {
return s;
}
Status s;
// Rollback unprepared transactions.
for (auto rtxn : rtxns) {
auto recovered_trx = rtxn.second;
@ -331,6 +320,10 @@ Status WriteUnpreparedTxnDB::Initialize(
if (s.ok()) {
dbimpl->DeleteAllRecoveredTransactions();
// Compaction should start only after max_evicted_seq_ is set AND recovered
// transactions are either added to PrepareHeap or rolled back.
s = EnableAutoCompaction(compaction_enabled_cf_handles);
}
return s;

Loading…
Cancel
Save