WritePrepared: fix Get without snapshot (#5664)

Summary:
if read_options.snapshot is not set, ::Get will take the last sequence number after taking a super-version and uses that as the sequence number. Theoretically max_eviceted_seq_ could advance this sequence number. This could lead ::IsInSnapshot that will be invoked by the ReadCallback to notice the absence of the snapshot. In this case, the ReadCallback should have passed a non-value to snap_released so that it could be set by the ::IsInSnapshot. The patch does that, and adds a unit test to verify it.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5664

Differential Revision: D16614033

Pulled By: maysamyabandeh

fbshipit-source-id: 06fb3fd4aacd75806ed1a1acec7961f5d02486f2
main
Maysam Yabandeh 6 years ago committed by Facebook Github Bot
parent e579e32eaa
commit 208556ee13
  1. 2
      include/rocksdb/statistics.h
  2. 4
      java/rocksjni/portal.h
  3. 5
      java/src/main/java/org/rocksdb/TickerType.java
  4. 1
      monitoring/statistics.cc
  5. 60
      utilities/transactions/write_prepared_transaction_test.cc
  6. 36
      utilities/transactions/write_prepared_txn.cc
  7. 14
      utilities/transactions/write_prepared_txn_db.cc
  8. 68
      utilities/transactions/write_prepared_txn_db.h
  9. 32
      utilities/transactions/write_unprepared_txn.cc
  10. 23
      utilities/transactions/write_unprepared_txn.h
  11. 3
      utilities/transactions/write_unprepared_txn_db.cc

@ -324,6 +324,8 @@ enum Tickers : uint32_t {
TXN_DUPLICATE_KEY_OVERHEAD,
// # of times snapshot_mutex_ is acquired in the fast path.
TXN_SNAPSHOT_MUTEX_OVERHEAD,
// # of times ::Get returned TryAgain due to expired snapshot seq
TXN_GET_TRY_AGAIN,
// Number of keys actually found in MultiGet calls (vs number requested by
// caller)

@ -4620,6 +4620,8 @@ class TickerTypeJni {
return -0x0B;
case rocksdb::Tickers::TXN_SNAPSHOT_MUTEX_OVERHEAD:
return -0x0C;
case rocksdb::Tickers::TXN_GET_TRY_AGAIN:
return -0x0D;
case rocksdb::Tickers::TICKER_ENUM_MAX:
// 0x5F for backwards compatibility on current minor version.
return 0x5F;
@ -4912,6 +4914,8 @@ class TickerTypeJni {
return rocksdb::Tickers::TXN_DUPLICATE_KEY_OVERHEAD;
case -0x0C:
return rocksdb::Tickers::TXN_SNAPSHOT_MUTEX_OVERHEAD;
case -0x0D:
return rocksdb::Tickers::TXN_GET_TRY_AGAIN;
case 0x5F:
// 0x5F for backwards compatibility on current minor version.
return rocksdb::Tickers::TICKER_ENUM_MAX;

@ -717,6 +717,11 @@ public enum TickerType {
*/
TXN_SNAPSHOT_MUTEX_OVERHEAD((byte) -0x0C),
/**
* # of times ::Get returned TryAgain due to expired snapshot seq
*/
TXN_GET_TRY_AGAIN((byte) -0x0D),
TICKER_ENUM_MAX((byte) 0x5F);
private final byte value;

@ -162,6 +162,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
"rocksdb.txn.overhead.mutex.old.commit.map"},
{TXN_DUPLICATE_KEY_OVERHEAD, "rocksdb.txn.overhead.duplicate.key"},
{TXN_SNAPSHOT_MUTEX_OVERHEAD, "rocksdb.txn.overhead.mutex.snapshot"},
{TXN_GET_TRY_AGAIN, "rocksdb.txn.get.tryagain"},
{NUMBER_MULTIGET_KEYS_FOUND, "rocksdb.number.multiget.keys.found"},
{NO_ITERATOR_CREATED, "rocksdb.num.iterator.created"},
{NO_ITERATOR_DELETED, "rocksdb.num.iterator.deleted"},

@ -1372,7 +1372,7 @@ TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
for (int i = 0; i < writes; i++) {
WriteBatch batch;
// For duplicate keys cause 4 commit entries, each evicting an entry that
// is not published yet, thus causing max ecited seq go higher than last
// is not published yet, thus causing max evicted seq go higher than last
// published.
for (int b = 0; b < batch_cnt; b++) {
batch.Put("foo", "foo");
@ -1404,6 +1404,64 @@ TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
db->ReleaseSnapshot(snap);
}
// Test that reads without snapshots would not hit an undefined state
TEST_P(WritePreparedTransactionTest, MaxCatchupWithUnbackedSnapshot) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
WriteOptions woptions;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
const int writes = 50;
rocksdb::port::Thread t1([&]() {
for (int i = 0; i < writes; i++) {
WriteBatch batch;
batch.Put("key", "foo");
db->Write(woptions, &batch);
}
});
rocksdb::port::Thread t2([&]() {
while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread
std::this_thread::yield();
}
ReadOptions ropt;
PinnableSlice pinnable_val;
TransactionOptions txn_options;
for (int i = 0; i < 10; i++) {
auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_TRUE(s.ok() || s.IsTryAgain());
pinnable_val.Reset();
Transaction* txn = db->BeginTransaction(woptions, txn_options);
s = txn->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_TRUE(s.ok() || s.IsTryAgain());
pinnable_val.Reset();
std::vector<std::string> values;
auto s_vec =
txn->MultiGet(ropt, {db->DefaultColumnFamily()}, {"key"}, &values);
ASSERT_EQ(1, values.size());
ASSERT_EQ(1, s_vec.size());
s = s_vec[0];
ASSERT_TRUE(s.ok() || s.IsTryAgain());
Slice key("key");
txn->MultiGet(ropt, db->DefaultColumnFamily(), 1, &key, &pinnable_val,
&s, true);
ASSERT_TRUE(s.ok() || s.IsTryAgain());
delete txn;
}
});
t1.join();
t2.join();
// Make sure that the test has worked and seq number has advanced as we
// thought
auto snap = db->GetSnapshot();
ASSERT_GT(snap->GetSequenceNumber(), writes - 1);
db->ReleaseSnapshot(snap);
}
// Check that old_commit_map_ cleanup works correctly if the snapshot equals
// max_evicted_seq_.
TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) {

@ -46,13 +46,16 @@ void WritePreparedTxn::MultiGet(const ReadOptions& options,
PinnableSlice* values, Status* statuses,
bool sorted_input) {
SequenceNumber min_uncommitted, snap_seq;
const bool backed_by_snapshot =
const SnapshotBackup backed_by_snapshot =
wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted);
WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
backed_by_snapshot);
write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
keys, values, statuses, sorted_input,
&callback);
if (UNLIKELY(!wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
if (UNLIKELY(!callback.valid() ||
!wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
for (size_t i = 0; i < num_keys; i++) {
statuses[i] = Status::TryAgain();
}
@ -63,15 +66,18 @@ Status WritePreparedTxn::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val) {
SequenceNumber min_uncommitted, snap_seq;
const bool backed_by_snapshot =
const SnapshotBackup backed_by_snapshot =
wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted);
WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
backed_by_snapshot);
auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
pinnable_val, &callback);
if (LIKELY(wpt_db_->ValidateSnapshot(callback.max_visible_seq(),
if (LIKELY(callback.valid() &&
wpt_db_->ValidateSnapshot(callback.max_visible_seq(),
backed_by_snapshot))) {
return res;
} else {
wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
return Status::TryAgain();
}
}
@ -241,9 +247,11 @@ Status WritePreparedTxn::RollbackInternal() {
auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap();
auto read_at_seq = kMaxSequenceNumber;
ReadOptions roptions;
// to prevent callback's seq to be overrriden inside DBImpk::Get
roptions.snapshot = wpt_db_->GetMaxSnapshot();
struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
DBImpl* db_;
ReadOptions roptions;
WritePreparedTxnReadCallback callback;
WriteBatch* rollback_batch_;
std::map<uint32_t, const Comparator*>& comparators_;
@ -251,18 +259,20 @@ Status WritePreparedTxn::RollbackInternal() {
using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_;
bool rollback_merge_operands_;
ReadOptions roptions_;
RollbackWriteBatchBuilder(
DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
WriteBatch* dst_batch,
std::map<uint32_t, const Comparator*>& comparators,
std::map<uint32_t, ColumnFamilyHandle*>& handles,
bool rollback_merge_operands)
bool rollback_merge_operands, ReadOptions _roptions)
: db_(db),
callback(wpt_db, snap_seq), // disable min_uncommitted optimization
rollback_batch_(dst_batch),
comparators_(comparators),
handles_(handles),
rollback_merge_operands_(rollback_merge_operands) {}
rollback_merge_operands_(rollback_merge_operands),
roptions_(_roptions) {}
Status Rollback(uint32_t cf, const Slice& key) {
Status s;
@ -280,7 +290,7 @@ Status WritePreparedTxn::RollbackInternal() {
PinnableSlice pinnable_val;
bool not_used;
auto cf_handle = handles_[cf];
s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
s = db_->GetImpl(roptions_, cf_handle, key, &pinnable_val, &not_used,
&callback);
assert(s.ok() || s.IsNotFound());
if (s.ok()) {
@ -330,7 +340,8 @@ Status WritePreparedTxn::RollbackInternal() {
bool WriteAfterCommit() const override { return false; }
} rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch,
*cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
wpt_db_->txn_db_options_.rollback_merge_operands);
wpt_db_->txn_db_options_.rollback_merge_operands,
roptions);
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
assert(s.ok());
if (!s.ok()) {
@ -434,7 +445,8 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl_->DefaultColumnFamily();
WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted);
WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted,
kBackedByDBSnapshot);
return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
snap_seq, false /* cache_only */,
&snap_checker, min_uncommitted);

@ -226,16 +226,18 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {
SequenceNumber min_uncommitted, snap_seq;
const bool backed_by_snapshot =
const SnapshotBackup backed_by_snapshot =
AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted);
WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted,
backed_by_snapshot);
bool* dont_care = nullptr;
auto res = db_impl_->GetImpl(options, column_family, key, value, dont_care,
&callback);
if (LIKELY(
ValidateSnapshot(callback.max_visible_seq(), backed_by_snapshot))) {
if (LIKELY(callback.valid() && ValidateSnapshot(callback.max_visible_seq(),
backed_by_snapshot))) {
return res;
} else {
WPRecordTick(TXN_GET_TRY_AGAIN);
return Status::TryAgain();
}
}
@ -298,7 +300,8 @@ struct WritePreparedTxnDB::IteratorState {
IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
std::shared_ptr<ManagedSnapshot> s,
SequenceNumber min_uncommitted)
: callback(txn_db, sequence, min_uncommitted), snapshot(s) {}
: callback(txn_db, sequence, min_uncommitted, kBackedByDBSnapshot),
snapshot(s) {}
WritePreparedTxnReadCallback callback;
std::shared_ptr<ManagedSnapshot> snapshot;
@ -392,6 +395,7 @@ void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
commit_cache_ = std::unique_ptr<std::atomic<CommitEntry64b>[]>(
new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
dummy_max_snapshot_.number_ = kMaxSequenceNumber;
}
void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max,

@ -30,6 +30,7 @@
#include "utilities/transactions/write_prepared_txn.h"
namespace rocksdb {
enum SnapshotBackup : bool { kUnbackedByDBSnapshot, kBackedByDBSnapshot };
// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
// In this way some data in the DB might not be committed. The DB provides
@ -448,18 +449,21 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
const ColumnFamilyOptions& cf_options) override;
// Assign the min and max sequence numbers for reading from the db. A seq >
// max is not valid, and a seq < min is valid, and a min <= seq < max requires
// further checkings. Normally max is defined by the snapshot and min is by
// further checking. Normally max is defined by the snapshot and min is by
// minimum uncommitted seq.
inline bool AssignMinMaxSeqs(const Snapshot* snapshot, SequenceNumber* min,
SequenceNumber* max);
inline SnapshotBackup AssignMinMaxSeqs(const Snapshot* snapshot,
SequenceNumber* min,
SequenceNumber* max);
// Validate is a snapshot sequence number is still valid based on the latest
// db status. backed_by_snapshot specifies if the number is baked by an actual
// snapshot object. order specified the memory order with which we load the
// atomic variables: relax is enough for the default since we care about last
// value seen by same thread.
inline bool ValidateSnapshot(
const SequenceNumber snap_seq, const bool backed_by_snapshot,
const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
std::memory_order order = std::memory_order_relaxed);
// Get a dummy snapshot that refers to kMaxSequenceNumber
Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; }
private:
friend class AddPreparedCallback;
@ -488,6 +492,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
friend class WritePreparedTransactionTest_MaxCatchupWithUnbackedSnapshot_Test;
friend class
WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test;
friend class
@ -783,26 +788,55 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Thread safety: since the handle is read-only object it is a const it is
// safe to read it concurrently
std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_;
// A dummy snapshot object that refers to kMaxSequenceNumber
SnapshotImpl dummy_max_snapshot_;
};
class WritePreparedTxnReadCallback : public ReadCallback {
public:
WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
: ReadCallback(snapshot), db_(db) {}
: ReadCallback(snapshot),
db_(db),
backed_by_snapshot_(kBackedByDBSnapshot) {}
WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
SequenceNumber min_uncommitted)
: ReadCallback(snapshot, min_uncommitted), db_(db) {}
SequenceNumber min_uncommitted,
SnapshotBackup backed_by_snapshot)
: ReadCallback(snapshot, min_uncommitted),
db_(db),
backed_by_snapshot_(backed_by_snapshot) {
(void)backed_by_snapshot_; // to silence unused private field warning
}
virtual ~WritePreparedTxnReadCallback() {
// If it is not backed by snapshot, the caller must check validity
assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
}
// Will be called to see if the seq number visible; if not it moves on to
// the next seq number.
inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override {
auto snapshot = max_visible_seq_;
return db_->IsInSnapshot(seq, snapshot, min_uncommitted_);
bool snap_released = false;
auto ret =
db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &snap_released);
assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
snap_released_ |= snap_released;
return ret;
}
inline bool valid() {
valid_checked_ = true;
return snap_released_ == false;
}
// TODO(myabandeh): override Refresh when Iterator::Refresh is supported
private:
WritePreparedTxnDB* db_;
// Whether max_visible_seq_ is backed by a snapshot
const SnapshotBackup backed_by_snapshot_;
bool snap_released_ = false;
// Safety check to ensure that the caller has checked invalid statuses
bool valid_checked_ = false;
};
class AddPreparedCallback : public PreReleaseCallback {
@ -1034,26 +1068,26 @@ struct SubBatchCounter : public WriteBatch::Handler {
bool WriteAfterCommit() const override { return false; }
};
bool WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot,
SequenceNumber* min,
SequenceNumber* max) {
SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot,
SequenceNumber* min,
SequenceNumber* max) {
if (snapshot != nullptr) {
*min = static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
->min_uncommitted_;
*max = static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
->number_;
return true;
return kBackedByDBSnapshot;
} else {
*min = SmallestUnCommittedSeq();
*max = 0; // to be assigned later after sv is referenced.
return false;
return kUnbackedByDBSnapshot;
}
}
bool WritePreparedTxnDB::ValidateSnapshot(const SequenceNumber snap_seq,
const bool backed_by_snapshot,
std::memory_order order) {
if (backed_by_snapshot) {
bool WritePreparedTxnDB::ValidateSnapshot(
const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
std::memory_order order) {
if (backed_by_snapshot == kBackedByDBSnapshot) {
return true;
} else {
SequenceNumber max = max_evicted_seq_.load(order);

@ -25,7 +25,11 @@ bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) {
}
}
return db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_);
bool snap_released = false;
auto ret = db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_);
assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
snap_released_ |= snap_released;
return ret;
}
WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
@ -547,8 +551,9 @@ Status WriteUnpreparedTxn::RollbackInternal() {
Status s;
const auto& cf_map = *wupt_db_->GetCFHandleMap();
auto read_at_seq = kMaxSequenceNumber;
ReadOptions roptions;
// to prevent callback's seq to be overrriden inside DBImpk::Get
roptions.snapshot = wpt_db_->GetMaxSnapshot();
// Note that we do not use WriteUnpreparedTxnReadCallback because we do not
// need to read our own writes when reading prior versions of the key for
// rollback.
@ -704,7 +709,8 @@ Status WriteUnpreparedTxn::RollbackToSavePointInternal() {
->min_uncommitted_;
SequenceNumber snap_seq = roptions.snapshot->GetSequenceNumber();
WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
top.unprep_seqs_);
top.unprep_seqs_,
kBackedByDBSnapshot);
const auto& cf_map = *wupt_db_->GetCFHandleMap();
for (const auto& cfkey : tracked_keys) {
const auto cfid = cfkey.first;
@ -784,14 +790,16 @@ void WriteUnpreparedTxn::MultiGet(const ReadOptions& options,
PinnableSlice* values, Status* statuses,
bool sorted_input) {
SequenceNumber min_uncommitted, snap_seq;
const bool backed_by_snapshot =
const SnapshotBackup backed_by_snapshot =
wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
unprep_seqs_);
unprep_seqs_, backed_by_snapshot);
write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
keys, values, statuses, sorted_input,
&callback);
if (UNLIKELY(!wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
if (UNLIKELY(!callback.valid() ||
!wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
for (size_t i = 0; i < num_keys; i++) {
statuses[i] = Status::TryAgain();
}
@ -802,15 +810,17 @@ Status WriteUnpreparedTxn::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {
SequenceNumber min_uncommitted, snap_seq;
const bool backed_by_snapshot =
const SnapshotBackup backed_by_snapshot =
wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
unprep_seqs_);
unprep_seqs_, backed_by_snapshot);
auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
value, &callback);
if (LIKELY(wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
if (LIKELY(callback.valid() &&
wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
return res;
} else {
wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
return Status::TryAgain();
}
}
@ -854,8 +864,8 @@ Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl_->DefaultColumnFamily();
WriteUnpreparedTxnReadCallback snap_checker(wupt_db_, snap_seq,
min_uncommitted, unprep_seqs_);
WriteUnpreparedTxnReadCallback snap_checker(
wupt_db_, snap_seq, min_uncommitted, unprep_seqs_, kBackedByDBSnapshot);
return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
snap_seq, false /* cache_only */,
&snap_checker, min_uncommitted);

@ -56,7 +56,8 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback {
WriteUnpreparedTxnReadCallback(
WritePreparedTxnDB* db, SequenceNumber snapshot,
SequenceNumber min_uncommitted,
const std::map<SequenceNumber, size_t>& unprep_seqs)
const std::map<SequenceNumber, size_t>& unprep_seqs,
SnapshotBackup backed_by_snapshot)
// Pass our last uncommitted seq as the snapshot to the parent class to
// ensure that the parent will not prematurely filter out own writes. We
// will do the exact comparison against snapshots in IsVisibleFullCheck
@ -64,10 +65,23 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback {
: ReadCallback(CalcMaxVisibleSeq(unprep_seqs, snapshot), min_uncommitted),
db_(db),
unprep_seqs_(unprep_seqs),
wup_snapshot_(snapshot) {}
wup_snapshot_(snapshot),
backed_by_snapshot_(backed_by_snapshot) {
(void)backed_by_snapshot_; // to silence unused private field warning
}
virtual ~WriteUnpreparedTxnReadCallback() {
// If it is not backed by snapshot, the caller must check validity
assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
}
virtual bool IsVisibleFullCheck(SequenceNumber seq) override;
inline bool valid() {
valid_checked_ = true;
return snap_released_ == false;
}
void Refresh(SequenceNumber seq) override {
max_visible_seq_ = std::max(max_visible_seq_, seq);
wup_snapshot_ = seq;
@ -88,6 +102,11 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback {
WritePreparedTxnDB* db_;
const std::map<SequenceNumber, size_t>& unprep_seqs_;
SequenceNumber wup_snapshot_;
// Whether max_visible_seq_ is backed by a snapshot
const SnapshotBackup backed_by_snapshot_;
bool snap_released_ = false;
// Safety check to ensure that the caller has checked invalid statuses
bool valid_checked_ = false;
};
class WriteUnpreparedTxn : public WritePreparedTxn {

@ -348,7 +348,8 @@ struct WriteUnpreparedTxnDB::IteratorState {
IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
std::shared_ptr<ManagedSnapshot> s,
SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn)
: callback(txn_db, sequence, min_uncommitted, txn->unprep_seqs_),
: callback(txn_db, sequence, min_uncommitted, txn->unprep_seqs_,
kBackedByDBSnapshot),
snapshot(s) {}
SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); }

Loading…
Cancel
Save