WritePrepared: snapshot should be larger than max_evicted_seq_ (#4886)

Summary:
The AdvanceMaxEvictedSeq algorithm assumes that new snapshots always have sequence number larger than the last max_evicted_seq_. To enforce this assumption we make two changes:
i) max is not advanced beyond the last published seq, with the exception that the evicted commit entry itself is not published yet, which is quite rare.
ii) When obtaining the snapshot if the max_evicted_seq_ is not published yet, commit a dummy entry so that it waits for it to be published and also increased the latest published seq by one above the max.
To test these non-realistic corner cases we create a commit cache with size 1 so that every single commit results into eviction.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4886

Differential Revision: D13685270

Pulled By: maysamyabandeh

fbshipit-source-id: 5461bc09c2a9b75798bfcb9853a256c81cdac0b0
main
Maysam Yabandeh 6 years ago committed by Facebook Github Bot
parent 7d13f307ff
commit cad99a6031
  1. 7
      db/db_impl.h
  2. 4
      utilities/transactions/pessimistic_transaction.cc
  3. 3
      utilities/transactions/pessimistic_transaction.h
  4. 164
      utilities/transactions/write_prepared_transaction_test.cc
  5. 25
      utilities/transactions/write_prepared_txn.cc
  6. 92
      utilities/transactions/write_prepared_txn_db.cc
  7. 8
      utilities/transactions/write_prepared_txn_db.h

@ -238,6 +238,13 @@ class DBImpl : public DB {
virtual Status SyncWAL() override; virtual Status SyncWAL() override;
virtual SequenceNumber GetLatestSequenceNumber() const override; virtual SequenceNumber GetLatestSequenceNumber() const override;
virtual SequenceNumber GetLastPublishedSequence() const {
if (last_seq_same_as_publish_seq_) {
return versions_->LastSequence();
} else {
return versions_->LastPublishedSequence();
}
}
// REQUIRES: joined the main write queue if two_write_queues is disabled, and // REQUIRES: joined the main write queue if two_write_queues is disabled, and
// the second write queue otherwise. // the second write queue otherwise.
virtual void SetLastPublishedSequence(SequenceNumber seq); virtual void SetLastPublishedSequence(SequenceNumber seq);

@ -37,7 +37,7 @@ TransactionID PessimisticTransaction::GenTxnID() {
PessimisticTransaction::PessimisticTransaction( PessimisticTransaction::PessimisticTransaction(
TransactionDB* txn_db, const WriteOptions& write_options, TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options) const TransactionOptions& txn_options, const bool init)
: TransactionBaseImpl(txn_db->GetRootDB(), write_options), : TransactionBaseImpl(txn_db->GetRootDB(), write_options),
txn_db_impl_(nullptr), txn_db_impl_(nullptr),
expiration_time_(0), expiration_time_(0),
@ -51,7 +51,9 @@ PessimisticTransaction::PessimisticTransaction(
txn_db_impl_ = txn_db_impl_ =
static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db); static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
db_impl_ = static_cast_with_check<DBImpl, DB>(db_); db_impl_ = static_cast_with_check<DBImpl, DB>(db_);
if (init) {
Initialize(txn_options); Initialize(txn_options);
}
} }
void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) { void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {

@ -38,7 +38,8 @@ class PessimisticTransactionDB;
class PessimisticTransaction : public TransactionBaseImpl { class PessimisticTransaction : public TransactionBaseImpl {
public: public:
PessimisticTransaction(TransactionDB* db, const WriteOptions& write_options, PessimisticTransaction(TransactionDB* db, const WriteOptions& write_options,
const TransactionOptions& txn_options); const TransactionOptions& txn_options,
const bool init = true);
virtual ~PessimisticTransaction(); virtual ~PessimisticTransaction();

@ -353,7 +353,7 @@ class WritePreparedTransactionTestBase : public TransactionTestBase {
: TransactionTestBase(use_stackable_db, two_write_queue, write_policy){}; : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){};
protected: protected:
// TODO(mayabndeh): Avoid duplicating PessimisticTransaction::Open logic here. // TODO(myabandeh): Avoid duplicating PessimisticTransaction::Open logic here.
void DestroyAndReopenWithExtraOptions(size_t snapshot_cache_bits, void DestroyAndReopenWithExtraOptions(size_t snapshot_cache_bits,
size_t commit_cache_bits) { size_t commit_cache_bits) {
delete db; delete db;
@ -376,8 +376,8 @@ class WritePreparedTransactionTestBase : public TransactionTestBase {
// The following is equivalent of WrapDB(). // The following is equivalent of WrapDB().
txn_db_options.write_policy = WRITE_PREPARED; txn_db_options.write_policy = WRITE_PREPARED;
auto* wp_db = new WritePreparedTxnDB(base_db, txn_db_options, snapshot_cache_bits, auto* wp_db = new WritePreparedTxnDB(
commit_cache_bits); base_db, txn_db_options, snapshot_cache_bits, commit_cache_bits);
wp_db->UpdateCFComparatorMap(handles); wp_db->UpdateCFComparatorMap(handles);
ASSERT_OK(wp_db->Initialize(compaction_enabled_cf_indices, handles)); ASSERT_OK(wp_db->Initialize(compaction_enabled_cf_indices, handles));
@ -831,6 +831,13 @@ TEST_P(WritePreparedTransactionTest, DoubleSnapshot) {
wp_db->ReleaseSnapshot(snapshot3); wp_db->ReleaseSnapshot(snapshot3);
} }
size_t UniqueCnt(std::vector<SequenceNumber> vec) {
std::set<SequenceNumber> aset;
for (auto i : vec) {
aset.insert(i);
}
return aset.size();
}
// Test that the entries in old_commit_map_ get garbage collected properly // Test that the entries in old_commit_map_ get garbage collected properly
TEST_P(WritePreparedTransactionTest, OldCommitMapGC) { TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
const size_t snapshot_cache_bits = 0; const size_t snapshot_cache_bits = 0;
@ -879,9 +886,9 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
ReadLock rl(&wp_db->old_commit_map_mutex_); ReadLock rl(&wp_db->old_commit_map_mutex_);
ASSERT_EQ(3, wp_db->old_commit_map_.size()); ASSERT_EQ(3, wp_db->old_commit_map_.size());
ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size()); ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq2].size()); ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq2]));
ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size()); ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
} }
// Verify that the 2nd snapshot is cleaned up after the release // Verify that the 2nd snapshot is cleaned up after the release
@ -890,8 +897,8 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
ReadLock rl(&wp_db->old_commit_map_mutex_); ReadLock rl(&wp_db->old_commit_map_mutex_);
ASSERT_EQ(2, wp_db->old_commit_map_.size()); ASSERT_EQ(2, wp_db->old_commit_map_.size());
ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size()); ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size()); ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
} }
// Verify that the 1st snapshot is cleaned up after the release // Verify that the 1st snapshot is cleaned up after the release
@ -900,7 +907,7 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
ReadLock rl(&wp_db->old_commit_map_mutex_); ReadLock rl(&wp_db->old_commit_map_mutex_);
ASSERT_EQ(1, wp_db->old_commit_map_.size()); ASSERT_EQ(1, wp_db->old_commit_map_.size());
ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size()); ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
} }
// Verify that the 3rd snapshot is cleaned up after the release // Verify that the 3rd snapshot is cleaned up after the release
@ -1139,6 +1146,139 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) {
} }
} }
// A new snapshot should always be always larger than max_evicted_seq_
// Otherwise the snapshot does not go through AdvanceMaxEvictedSeq
TEST_P(WritePreparedTransactionTest, NewSnapshotLargerThanMax) {
WriteOptions woptions;
TransactionOptions txn_options;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
Transaction* txn0 = db->BeginTransaction(woptions, txn_options);
ASSERT_OK(txn0->Put(Slice("key"), Slice("value")));
ASSERT_OK(txn0->Commit());
const SequenceNumber seq = txn0->GetId(); // is also prepare seq
delete txn0;
std::vector<Transaction*> txns;
// Inc seq without committing anything
for (int i = 0; i < 10; i++) {
Transaction* txn = db->BeginTransaction(woptions, txn_options);
ASSERT_OK(txn->SetName("xid" + std::to_string(i)));
ASSERT_OK(txn->Put(Slice("key" + std::to_string(i)), Slice("value")));
ASSERT_OK(txn->Prepare());
txns.push_back(txn);
}
// The new commit is seq + 10
ASSERT_OK(db->Put(woptions, "key", "value"));
auto snap = wp_db->GetSnapshot();
const SequenceNumber last_seq = snap->GetSequenceNumber();
wp_db->ReleaseSnapshot(snap);
ASSERT_LT(seq, last_seq);
// Otherwise our test is not effective
ASSERT_LT(last_seq - seq, wp_db->INC_STEP_FOR_MAX_EVICTED);
// Evict seq out of commit cache
const SequenceNumber overwrite_seq = seq + wp_db->COMMIT_CACHE_SIZE;
// Check that the next write could make max go beyond last
auto last_max = wp_db->max_evicted_seq_.load();
wp_db->AddCommitted(overwrite_seq, overwrite_seq);
// Check that eviction has advanced the max
ASSERT_LT(last_max, wp_db->max_evicted_seq_.load());
// Check that the new max has not advanced the last seq
ASSERT_LT(wp_db->max_evicted_seq_.load(), last_seq);
for (auto txn : txns) {
txn->Rollback();
delete txn;
}
}
// A new snapshot should always be always larger than max_evicted_seq_
// In very rare cases max could be below last published seq. Test that
// taking snapshot will wait for max to catch up.
TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits);
WriteOptions woptions;
TransactionOptions txn_options;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
const int writes = 50;
const int batch_cnt = 4;
rocksdb::port::Thread t1([&]() {
for (int i = 0; i < writes; i++) {
WriteBatch batch;
// For duplicate keys cause 4 commit entires, each evicting an entry that
// is not published yet, thus causing max ecited seq go higher than last
// published.
for (int b = 0; b < batch_cnt; b++) {
batch.Put("foo", "foo");
}
db->Write(woptions, &batch);
}
});
rocksdb::port::Thread t2([&]() {
while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread
std::this_thread::yield();
}
for (int i = 0; i < 10; i++) {
auto snap = db->GetSnapshot();
if (snap->GetSequenceNumber() != 0) {
ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
} // seq 0 is ok to be less than max since nothing is visible to it
db->ReleaseSnapshot(snap);
}
});
t1.join();
t2.join();
// Make sure that the test has worked and seq number has advanced as we
// thought
auto snap = db->GetSnapshot();
ASSERT_GT(snap->GetSequenceNumber(), batch_cnt * writes - 1);
db->ReleaseSnapshot(snap);
}
TEST_P(WritePreparedTransactionTest, AdvanceSeqByOne) {
auto snap = db->GetSnapshot();
auto seq1 = snap->GetSequenceNumber();
db->ReleaseSnapshot(snap);
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
wp_db->AdvanceSeqByOne();
snap = db->GetSnapshot();
auto seq2 = snap->GetSequenceNumber();
db->ReleaseSnapshot(snap);
ASSERT_LT(seq1, seq2);
}
// Test that the txn Initilize calls the overridden functions
TEST_P(WritePreparedTransactionTest, TxnInitialize) {
TransactionOptions txn_options;
WriteOptions write_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn0->SetName("xid"));
ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
ASSERT_OK(txn0->Prepare());
// SetSnapshot is overridden to update min_uncommitted_
txn_options.set_snapshot = true;
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
auto snap = txn1->GetSnapshot();
auto snap_impl = reinterpret_cast<const SnapshotImpl*>(snap);
// If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be
// udpated
ASSERT_GT(snap_impl->min_uncommitted_, 0);
txn0->Rollback();
txn1->Rollback();
delete txn0;
delete txn1;
}
// This tests that transactions with duplicate keys perform correctly after max // This tests that transactions with duplicate keys perform correctly after max
// is advancing their prepared sequence numbers. This will not be the case if // is advancing their prepared sequence numbers. This will not be the case if
// for example the txn does not add the prepared seq for the second sub-batch to // for example the txn does not add the prepared seq for the second sub-batch to
@ -2072,7 +2212,8 @@ TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
ASSERT_OK(transaction->Prepare()); ASSERT_OK(transaction->Prepare());
// snapshot1 should get min_uncommitted from prepared_txns_ heap. // snapshot1 should get min_uncommitted from prepared_txns_ heap.
auto snapshot1 = db->GetSnapshot(); auto snapshot1 = db->GetSnapshot();
ASSERT_EQ(transaction->GetId(), ((SnapshotImpl*)snapshot1)->min_uncommitted_); ASSERT_EQ(transaction->GetId(),
((SnapshotImpl*)snapshot1)->min_uncommitted_);
// Add a commit to advance max_evicted_seq and move the prepared transaction // Add a commit to advance max_evicted_seq and move the prepared transaction
// into delayed_prepared_ set. // into delayed_prepared_ set.
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
@ -2086,7 +2227,8 @@ TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
} }
// snapshot2 should get min_uncommitted from delayed_prepared_ set. // snapshot2 should get min_uncommitted from delayed_prepared_ set.
auto snapshot2 = db->GetSnapshot(); auto snapshot2 = db->GetSnapshot();
ASSERT_EQ(transaction->GetId(), ((SnapshotImpl*)snapshot1)->min_uncommitted_); ASSERT_EQ(transaction->GetId(),
((SnapshotImpl*)snapshot1)->min_uncommitted_);
ASSERT_OK(transaction->Commit()); ASSERT_OK(transaction->Commit());
delete transaction; delete transaction;
if (has_recent_prepare) { if (has_recent_prepare) {

@ -31,8 +31,13 @@ struct WriteOptions;
WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db, WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
const WriteOptions& write_options, const WriteOptions& write_options,
const TransactionOptions& txn_options) const TransactionOptions& txn_options)
: PessimisticTransaction(txn_db, write_options, txn_options), : PessimisticTransaction(txn_db, write_options, txn_options, false),
wpt_db_(txn_db) {} wpt_db_(txn_db) {
// Call Initialize outside PessimisticTransaction constructor otherwise it
// would skip overridden functions in WritePreparedTxn since they are not
// defined yet in the constructor of PessimisticTransaction
Initialize(txn_options);
}
void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) { void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
PessimisticTransaction::Initialize(txn_options); PessimisticTransaction::Initialize(txn_options);
@ -413,20 +418,8 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
} }
void WritePreparedTxn::SetSnapshot() { void WritePreparedTxn::SetSnapshot() {
// Note: for this optimization setting the last sequence number and obtaining const bool kForWWConflictCheck = true;
// the smallest uncommitted seq should be done atomically. However to avoid SnapshotImpl* snapshot = wpt_db_->GetSnapshotInternal(kForWWConflictCheck);
// the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
// snapshot. Since we always updated the list of unprepared seq (via
// AddPrepared) AFTER the last sequence is updated, this guarantees that the
// smallest uncommitted seq that we pair with the snapshot is smaller or equal
// the value that would be obtained otherwise atomically. That is ok since
// this optimization works as long as min_uncommitted is less than or equal
// than the smallest uncommitted seq when the snapshot was taken.
auto min_uncommitted = wpt_db_->SmallestUnCommittedSeq();
const bool FOR_WW_CONFLICT_CHECK = true;
SnapshotImpl* snapshot = dbimpl_->GetSnapshotImpl(FOR_WW_CONFLICT_CHECK);
assert(snapshot);
wpt_db_->EnhanceSnapshot(snapshot, min_uncommitted);
SetSnapshotInternal(snapshot); SetSnapshotInternal(snapshot);
} }

@ -422,8 +422,22 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
"Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64, "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64,
evicted.prep_seq, evicted.commit_seq, prev_max); evicted.prep_seq, evicted.commit_seq, prev_max);
if (prev_max < evicted.commit_seq) { if (prev_max < evicted.commit_seq) {
auto last = db_impl_->GetLastPublishedSequence(); // could be 0
SequenceNumber max_evicted_seq;
if (LIKELY(evicted.commit_seq < last)) {
assert(last > 0);
// Inc max in larger steps to avoid frequent updates // Inc max in larger steps to avoid frequent updates
auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED; max_evicted_seq =
std::min(evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED, last - 1);
} else {
// legit when a commit entry in a write batch overwrite the previous one
max_evicted_seq = evicted.commit_seq;
}
ROCKS_LOG_DETAILS(info_log_,
"%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64
" => %lu",
prepare_seq, evicted.prep_seq, evicted.commit_seq,
prev_max, max_evicted_seq);
AdvanceMaxEvictedSeq(prev_max, max_evicted_seq); AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
} }
// After each eviction from commit cache, check if the commit entry should // After each eviction from commit cache, check if the commit entry should
@ -554,16 +568,84 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
} }
const Snapshot* WritePreparedTxnDB::GetSnapshot() { const Snapshot* WritePreparedTxnDB::GetSnapshot() {
// Note: SmallestUnCommittedSeq must be called before GetSnapshotImpl. Refer const bool kForWWConflictCheck = true;
// to WritePreparedTxn::SetSnapshot for more explanation. return GetSnapshotInternal(!kForWWConflictCheck);
}
SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal(
bool for_ww_conflict_check) {
// Note: for this optimization setting the last sequence number and obtaining
// the smallest uncommitted seq should be done atomically. However to avoid
// the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
// snapshot. Since we always updated the list of unprepared seq (via
// AddPrepared) AFTER the last sequence is updated, this guarantees that the
// smallest uncommitted seq that we pair with the snapshot is smaller or equal
// the value that would be obtained otherwise atomically. That is ok since
// this optimization works as long as min_uncommitted is less than or equal
// than the smallest uncommitted seq when the snapshot was taken.
auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq(); auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq();
const bool FOR_WW_CONFLICT_CHECK = true; SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(!FOR_WW_CONFLICT_CHECK); assert(snap_impl);
SequenceNumber snap_seq = snap_impl->GetSequenceNumber();
if (UNLIKELY(snap_seq != 0 && snap_seq <= max_evicted_seq_)) {
// There is a very rare case in which the commit entry evicts another commit
// entry that is not published yet thus advancing max evicted seq beyond the
// last published seq. This case is not likely in real-world setup so we
// handle it with a few retries.
size_t retry = 0;
while (snap_impl->GetSequenceNumber() <= max_evicted_seq_ && retry < 100) {
ROCKS_LOG_WARN(info_log_, "GetSnapshot retry %" PRIu64,
snap_impl->GetSequenceNumber());
ReleaseSnapshot(snap_impl);
// Wait for last visible seq to catch up with max, and also go beyond it
// by one.
AdvanceSeqByOne();
snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
assert(snap_impl); assert(snap_impl);
retry++;
}
assert(snap_impl->GetSequenceNumber() > max_evicted_seq_);
if (snap_impl->GetSequenceNumber() <= max_evicted_seq_) {
throw std::runtime_error("Snapshot seq " +
ToString(snap_impl->GetSequenceNumber()) +
" after " + ToString(retry) +
" retries is still less than max_evicted_seq_" +
ToString(max_evicted_seq_.load()));
}
}
EnhanceSnapshot(snap_impl, min_uncommitted); EnhanceSnapshot(snap_impl, min_uncommitted);
ROCKS_LOG_DETAILS(
db_impl_->immutable_db_options().info_log,
"GetSnapshot %" PRIu64 " ww:%" PRIi32 " min_uncommitted: %" PRIu64,
for_ww_conflict_check, snap_impl->GetSequenceNumber(), min_uncommitted);
return snap_impl; return snap_impl;
} }
void WritePreparedTxnDB::AdvanceSeqByOne() {
// Inserting an empty value will i) let the max evicted entry to be
// published, i.e., max == last_published, increase the last published to
// be one beyond max, i.e., max < last_published.
WriteOptions woptions;
TransactionOptions txn_options;
Transaction* txn0 = BeginTransaction(woptions, txn_options, nullptr);
std::hash<std::thread::id> hasher;
char name[64];
snprintf(name, 64, "txn%" ROCKSDB_PRIszt, hasher(std::this_thread::get_id()));
assert(strlen(name) < 64 - 1);
Status s = txn0->SetName(name);
assert(s.ok());
if (s.ok()) {
// Without prepare it would simply skip the commit
s = txn0->Prepare();
}
assert(s.ok());
if (s.ok()) {
s = txn0->Commit();
}
assert(s.ok());
delete txn0;
}
const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB( const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
SequenceNumber max) { SequenceNumber max) {
ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max); ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max);

@ -374,6 +374,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override; void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override;
virtual const Snapshot* GetSnapshot() override; virtual const Snapshot* GetSnapshot() override;
SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check);
protected: protected:
virtual Status VerifyCFOptions( virtual Status VerifyCFOptions(
@ -395,10 +396,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
friend class friend class
WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test; WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test;
friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test;
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
friend class WritePreparedTransactionTest_DoubleSnapshot_Test; friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test; friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
friend class WritePreparedTransactionTest_OldCommitMapGC_Test; friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
friend class WritePreparedTransactionTest_RollbackTest_Test; friend class WritePreparedTransactionTest_RollbackTest_Test;
friend class WriteUnpreparedTxnDB; friend class WriteUnpreparedTxnDB;
@ -564,6 +568,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
const uint64_t& snapshot_seq, const uint64_t& snapshot_seq,
const bool next_is_larger); const bool next_is_larger);
// A trick to increase the last visible sequence number by one and also wait
// for the in-flight commits to be visible.
void AdvanceSeqByOne();
// The list of live snapshots at the last time that max_evicted_seq_ advanced. // The list of live snapshots at the last time that max_evicted_seq_ advanced.
// The list stored into two data structures: in snapshot_cache_ that is // The list stored into two data structures: in snapshot_cache_ that is
// efficient for concurrent reads, and in snapshots_ if the data does not fit // efficient for concurrent reads, and in snapshots_ if the data does not fit

Loading…
Cancel
Save