WritePrepared Txn: Lock-free CommitMap

Summary:
We had two proposals for lock-free commit maps. This patch implements the latter one that was simpler. We can later experiment with both proposals.

In this impl each entry is an std::atomic of uint64_t, which are accessed via memory_order_acquire/release. In x86_64 arch this is compiled to simple reads and writes from memory.
Closes https://github.com/facebook/rocksdb/pull/2861

Differential Revision: D5800724

Pulled By: maysamyabandeh

fbshipit-source-id: 41abae9a4a5df050a8eb696c43de11c2770afdda
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 72e4190918
commit 09713a64b3
  1. 2
      utilities/transactions/pessimistic_transaction.cc
  2. 4
      utilities/transactions/pessimistic_transaction.h
  3. 47
      utilities/transactions/pessimistic_transaction_db.cc
  4. 131
      utilities/transactions/pessimistic_transaction_db.h
  5. 102
      utilities/transactions/write_prepared_transaction_test.cc
  6. 8
      utilities/transactions/write_prepared_txn.cc
  7. 2
      utilities/transactions/write_prepared_txn.h

@ -125,7 +125,7 @@ WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
const TransactionOptions& txn_options)
: PessimisticTransaction(txn_db, write_options, txn_options){};
Status WriteCommittedTxn::CommitBatch(WriteBatch* batch) {
Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
TransactionKeyMap keys_to_unlock;
Status s = LockBatch(batch, &keys_to_unlock);

@ -52,7 +52,7 @@ class PessimisticTransaction : public TransactionBaseImpl {
// It is basically Commit without going through Prepare phase. The write batch
// is also directly provided instead of expecting txn to gradually batch the
// transactions writes to an internal write batch.
virtual Status CommitBatch(WriteBatch* batch) = 0;
Status CommitBatch(WriteBatch* batch);
Status Rollback() override = 0;
@ -191,8 +191,6 @@ class WriteCommittedTxn : public PessimisticTransaction {
virtual ~WriteCommittedTxn() {}
Status CommitBatch(WriteBatch* batch) override;
Status Rollback() override;
private:

@ -541,8 +541,9 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
}
}
auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
CommitEntry64b dont_care;
CommitEntry cached;
bool exist = GetCommitEntry(indexed_seq, &cached);
bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
if (!exist) {
// It is not committed, so it must be still prepared
return false;
@ -599,8 +600,9 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
prepare_seq, commit_seq);
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
CommitEntry64b evicted_64b;
CommitEntry evicted;
bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted);
bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted);
if (to_be_evicted) {
auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
if (prev_max < evicted.commit_seq) {
@ -613,7 +615,7 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
CheckAgainstSnapshots(evicted);
}
bool succ =
ExchangeCommitEntry(indexed_seq, evicted, {prepare_seq, commit_seq});
ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
if (!succ) {
// A very rare event, in which the commit entry is updated before we do.
// Here we apply a very simple solution of retrying.
@ -636,34 +638,32 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
}
bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
CommitEntry64b* entry_64b,
CommitEntry* entry) {
// TODO(myabandeh): implement lock-free commit_cache_
ReadLock rl(&commit_cache_mutex_);
*entry = commit_cache_[indexed_seq];
return (entry->commit_seq != 0); // initialized
*entry_64b = commit_cache_[indexed_seq].load(std::memory_order_acquire);
bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT);
return valid;
}
bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq,
const CommitEntry& new_entry,
CommitEntry* evicted_entry) {
// TODO(myabandeh): implement lock-free commit_cache_
WriteLock wl(&commit_cache_mutex_);
*evicted_entry = commit_cache_[indexed_seq];
commit_cache_[indexed_seq] = new_entry;
return (evicted_entry->commit_seq != 0); // initialized
CommitEntry64b new_entry_64b(new_entry, FORMAT);
CommitEntry64b evicted_entry_64b = commit_cache_[indexed_seq].exchange(
new_entry_64b, std::memory_order_acq_rel);
bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT);
return valid;
}
bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
const CommitEntry& expected_entry,
CommitEntry64b& expected_entry_64b,
const CommitEntry& new_entry) {
// TODO(myabandeh): implement lock-free commit_cache_
WriteLock wl(&commit_cache_mutex_);
auto& evicted_entry = commit_cache_[indexed_seq];
if (evicted_entry.prep_seq != expected_entry.prep_seq) {
return false;
}
commit_cache_[indexed_seq] = new_entry;
return true;
auto& atomic_entry = commit_cache_[indexed_seq];
CommitEntry64b new_entry_64b(new_entry, FORMAT);
bool succ = atomic_entry.compare_exchange_strong(
expected_entry_64b, new_entry_64b, std::memory_order_acq_rel,
std::memory_order_acquire);
return succ;
}
void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max,
@ -700,10 +700,9 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max,
if (update_snapshots) {
UpdateSnapshots(snapshots, new_snapshots_version);
}
// TODO(myabandeh): check if it worked with relaxed ordering
while (prev_max < new_max && !max_evicted_seq_.compare_exchange_weak(
prev_max, new_max, std::memory_order_release,
std::memory_order_acquire)) {
prev_max, new_max, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
};
}

@ -105,16 +105,6 @@ class PessimisticTransactionDB : public TransactionDB {
std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
void SetDeadlockInfoBufferSize(uint32_t target_size) override;
struct CommitEntry {
uint64_t prep_seq;
uint64_t commit_seq;
CommitEntry() : prep_seq(0), commit_seq(0) {}
CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
bool operator==(const CommitEntry& rhs) const {
return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq;
}
};
protected:
void ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,
@ -170,21 +160,27 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
public:
explicit WritePreparedTxnDB(
DB* db, const TransactionDBOptions& txn_db_options,
size_t snapshot_cache_size = DEF_SNAPSHOT_CACHE_SIZE,
size_t commit_cache_size = DEF_COMMIT_CACHE_SIZE)
size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS,
size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS)
: PessimisticTransactionDB(db, txn_db_options),
SNAPSHOT_CACHE_SIZE(snapshot_cache_size),
COMMIT_CACHE_SIZE(commit_cache_size) {
SNAPSHOT_CACHE_BITS(snapshot_cache_bits),
SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
COMMIT_CACHE_BITS(commit_cache_bits),
COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
FORMAT(COMMIT_CACHE_BITS) {
init(txn_db_options);
}
explicit WritePreparedTxnDB(
StackableDB* db, const TransactionDBOptions& txn_db_options,
size_t snapshot_cache_size = DEF_SNAPSHOT_CACHE_SIZE,
size_t commit_cache_size = DEF_COMMIT_CACHE_SIZE)
size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS,
size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS)
: PessimisticTransactionDB(db, txn_db_options),
SNAPSHOT_CACHE_SIZE(snapshot_cache_size),
COMMIT_CACHE_SIZE(commit_cache_size) {
SNAPSHOT_CACHE_BITS(snapshot_cache_bits),
SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
COMMIT_CACHE_BITS(commit_cache_bits),
COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
FORMAT(COMMIT_CACHE_BITS) {
init(txn_db_options);
}
@ -203,6 +199,87 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// commit_seq to the commit map
void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq);
struct CommitEntry {
uint64_t prep_seq;
uint64_t commit_seq;
CommitEntry() : prep_seq(0), commit_seq(0) {}
CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
bool operator==(const CommitEntry& rhs) const {
return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq;
}
};
struct CommitEntry64bFormat {
explicit CommitEntry64bFormat(size_t index_bits)
: INDEX_BITS(index_bits),
PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)),
COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)),
COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)) {}
// Number of higher bits of a sequence number that is not used. They are
// used to encode the value type, ...
const size_t PAD_BITS = static_cast<size_t>(8);
// Number of lower bits from prepare seq that can be skipped as they are
// implied by the index of the entry in the array
const size_t INDEX_BITS;
// Number of bits we use to encode the prepare seq
const size_t PREP_BITS;
// Number of bits we use to encode the commit seq.
const size_t COMMIT_BITS;
// Filter to encode/decode commit seq
const uint64_t COMMIT_FILTER;
};
// Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ...
// INDEX Detal Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ...
// DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA
// ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and
// hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the
// bits that do not have to be encoded (will be provided externally) DELTA:
// prep seq - commit seq + 1 Number of DELTA bits should be equal to number of
// index bits + PADs
struct CommitEntry64b {
constexpr CommitEntry64b() noexcept : rep_(0) {}
CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format)
: CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {}
CommitEntry64b(const uint64_t ps, const uint64_t cs,
const CommitEntry64bFormat& format) {
assert(ps < static_cast<uint64_t>(
(1ull << (format.PREP_BITS + format.INDEX_BITS))));
assert(ps <= cs);
uint64_t delta = cs - ps + 1; // make initialized delta always >= 1
// zero is reserved for uninitialized entries
assert(0 < delta);
assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)));
rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER;
rep_ = rep_ | delta;
}
// Return false if the entry is empty
bool Parse(const uint64_t indexed_seq, CommitEntry* entry,
const CommitEntry64bFormat& format) {
uint64_t delta = rep_ & format.COMMIT_FILTER;
// zero is reserved for uninitialized entries
assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)));
if (delta == 0) {
return false; // initialized entry would have non-zero delta
}
assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS)));
uint64_t prep_up = rep_ & ~format.COMMIT_FILTER;
prep_up >>= format.PAD_BITS;
const uint64_t& prep_low = indexed_seq;
entry->prep_seq = prep_up | prep_low;
entry->commit_seq = entry->prep_seq + delta - 1;
return true;
}
private:
uint64_t rep_;
};
private:
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
@ -220,8 +297,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast<size_t>(1));
snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>(
new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
commit_cache_ =
unique_ptr<CommitEntry[]>(new CommitEntry[COMMIT_CACHE_SIZE]{});
commit_cache_ = unique_ptr<std::atomic<CommitEntry64b>[]>(
new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
}
// A heap with the amortized O(1) complexity for erase. It uses one extra heap
@ -263,7 +340,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Get the commit entry with index indexed_seq from the commit table. It
// returns true if such entry exists.
bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry* entry);
bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b,
CommitEntry* entry);
// Rewrite the entry with the index indexed_seq in the commit table with the
// commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
@ -275,7 +353,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// commit entry new_entry only if the existing entry matches the
// expected_entry. Returns false otherwise.
bool ExchangeCommitEntry(const uint64_t indexed_seq,
const CommitEntry& expected_entry,
CommitEntry64b& expected_entry,
const CommitEntry& new_entry);
// Increase max_evicted_seq_ from the previous value prev_max to the new
@ -325,7 +403,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
// each entry. In x86_64 architecture such reads are compiled to simple read
// instructions. 128 entries
static const size_t DEF_SNAPSHOT_CACHE_SIZE = static_cast<size_t>(1 << 7);
static const size_t DEF_SNAPSHOT_CACHE_BITS = static_cast<size_t>(7);
const size_t SNAPSHOT_CACHE_BITS;
const size_t SNAPSHOT_CACHE_SIZE;
unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
// 2nd list for storing snapshots. The list sorted in ascending order.
@ -339,11 +418,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// prepared_mutex_.
PreparedHeap prepared_txns_;
// 10m entry, 80MB size
static const size_t DEF_COMMIT_CACHE_SIZE = static_cast<size_t>(1 << 21);
static const size_t DEF_COMMIT_CACHE_BITS = static_cast<size_t>(21);
const size_t COMMIT_CACHE_BITS;
const size_t COMMIT_CACHE_SIZE;
const CommitEntry64bFormat FORMAT;
// commit_cache_ must be initialized to zero to tell apart an empty index from
// a filled one. Thread-safety is provided with commit_cache_mutex_.
unique_ptr<CommitEntry[]> commit_cache_;
unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
// The largest evicted *commit* sequence number from the commit_cache_
std::atomic<uint64_t> max_evicted_seq_ = {};
// Advance max_evicted_seq_ by this value each time it needs an update. The

@ -40,7 +40,9 @@ using std::string;
namespace rocksdb {
using CommitEntry = PessimisticTransactionDB::CommitEntry;
using CommitEntry = WritePreparedTxnDB::CommitEntry;
using CommitEntry64b = WritePreparedTxnDB::CommitEntry64b;
using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat;
TEST(PreparedHeap, BasicsTest) {
WritePreparedTxnDB::PreparedHeap heap;
@ -106,6 +108,49 @@ TEST(PreparedHeap, BasicsTest) {
ASSERT_TRUE(heap.empty());
}
TEST(CommitEntry64b, BasicTest) {
const size_t INDEX_BITS = static_cast<size_t>(21);
const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS);
const CommitEntry64bFormat FORMAT(static_cast<size_t>(INDEX_BITS));
// zero-initialized CommitEntry64b should inidcate an empty entry
CommitEntry64b empty_entry64b;
uint64_t empty_index = 11ul;
CommitEntry empty_entry;
bool ok = empty_entry64b.Parse(empty_index, &empty_entry, FORMAT);
ASSERT_FALSE(ok);
// the zero entry is reserved for un-initialized entries
const size_t MAX_COMMIT = (1 << FORMAT.COMMIT_BITS) - 1 - 1;
// Samples over the numbers that are covered by that many index bits
std::array<uint64_t, 4> is = {{0, 1, INDEX_SIZE / 2 + 1, INDEX_SIZE - 1}};
// Samples over the numbers that are covered by that many commit bits
std::array<uint64_t, 4> ds = {{0, 1, MAX_COMMIT / 2 + 1, MAX_COMMIT}};
// Iterate over prepare numbers that have i) cover all bits of a sequence
// number, and ii) include some bits that fall into the range of index or
// commit bits
for (uint64_t base = 1; base < kMaxSequenceNumber; base *= 2) {
for (uint64_t i : is) {
for (uint64_t d : ds) {
uint64_t p = base + i + d;
for (uint64_t c : {p, p + d / 2, p + d}) {
uint64_t index = p % INDEX_SIZE;
CommitEntry before(p, c), after;
CommitEntry64b entry64b(before, FORMAT);
ok = entry64b.Parse(index, &after, FORMAT);
ASSERT_TRUE(ok);
if (!(before == after)) {
printf("base %" PRIu64 " i %" PRIu64 " d %" PRIu64 " p %" PRIu64
" c %" PRIu64 " index %" PRIu64 "\n",
base, i, d, p, c, index);
}
ASSERT_EQ(before, after);
}
}
}
}
}
class WritePreparedTxnDBMock : public WritePreparedTxnDB {
public:
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt)
@ -255,33 +300,35 @@ TEST_P(WritePreparedTransactionTest, CommitMapTest) {
ASSERT_FALSE(evicted);
// Should be able to read the same value
bool found = wp_db->GetCommitEntry(c.prep_seq % size, &e);
CommitEntry64b dont_care;
bool found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_EQ(c, e);
// Should be able to distinguish between overlapping entries
found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &e);
found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_NE(c.prep_seq + size, e.prep_seq);
// Should be able to detect non-existent entry
found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &e);
ASSERT_EQ(e.commit_seq, 0);
found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &dont_care, &e);
ASSERT_FALSE(found);
// Reject an invalid exchange
CommitEntry e2 = {c.prep_seq + size, c.commit_seq};
bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2, e);
CommitEntry e2 = {c.prep_seq + size, c.commit_seq + size};
CommitEntry64b e2_64b(e2, wp_db->FORMAT);
bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2_64b, e);
ASSERT_FALSE(exchanged);
// check whether it did actually reject that
found = wp_db->GetCommitEntry(e2.prep_seq % size, &e);
found = wp_db->GetCommitEntry(e2.prep_seq % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_EQ(c, e);
// Accept a valid exchange
CommitEntry64b c_64b(c, wp_db->FORMAT);
CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1};
exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c, e3);
exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c_64b, e3);
ASSERT_TRUE(exchanged);
// check whether it did actually accepted that
found = wp_db->GetCommitEntry(c.prep_seq % size, &e);
found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_EQ(e3, e);
@ -290,7 +337,7 @@ TEST_P(WritePreparedTransactionTest, CommitMapTest) {
evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e);
ASSERT_TRUE(evicted);
ASSERT_EQ(e3, e);
found = wp_db->GetCommitEntry(e4.prep_seq % size, &e);
found = wp_db->GetCommitEntry(e4.prep_seq % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_EQ(e4, e);
}
@ -333,11 +380,15 @@ TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
}
TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l,
500l, 600l, 700l};
std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l,
600l, 700l, 800l, 900l};
const size_t snapshot_cache_bits = 2;
// Safety check to express the intended size in the test. Can be adjusted if
// the snapshots lists changed.
assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size());
DBImpl* mock_db = new DBImpl(options, dbname);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
mock_db, txn_db_options, snapshots.size() / 2));
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits));
SequenceNumber version = 1000l;
ASSERT_EQ(0, wp_db->snapshots_total_);
wp_db->UpdateSnapshots(snapshots, version);
@ -345,9 +396,9 @@ TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
// seq numbers are chosen so that we have two of them between each two
// snapshots. If the diff of two consecuitive seq is more than 5, there is a
// snapshot between them.
std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l,
350l, 355l, 450l, 455l, 550l, 555l,
650l, 655l, 750l, 755l};
std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l, 350l,
355l, 450l, 455l, 550l, 555l, 650l, 655l,
750l, 755l, 850l, 855l, 950l, 955l};
assert(seqs.size() > 1);
for (size_t i = 0; i < seqs.size() - 1; i++) {
wp_db->old_commit_map_empty_ = true; // reset
@ -374,12 +425,16 @@ TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) {
// in the methods must also be added.
const std::vector<SequenceNumber> snapshots = {10l, 20l, 30l, 40l, 50l,
60l, 70l, 80l, 90l, 100l};
const size_t snapshot_cache_bits = 2;
// Safety check to express the intended size in the test. Can be adjusted if
// the snapshots lists changed.
assert((1ul << snapshot_cache_bits) * 2 + 2 == snapshots.size());
SequenceNumber version = 1000l;
// Choose the cache size so that the new snapshot list could replace all the
// existing items in the cache and also have some overflow.
DBImpl* mock_db = new DBImpl(options, dbname);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
mock_db, txn_db_options, (snapshots.size() - 2) / 2));
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits));
// Add up to 2 items that do not fit into the cache
for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + 2;
old_size++) {
@ -435,6 +490,7 @@ TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) {
}
}
}
printf("\n");
}
#endif
@ -502,9 +558,9 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
WriteOptions wo;
// Use small commit cache to trigger lots of eviction and fast advance of
// max_evicted_seq_
const size_t commit_cache_size = 8;
const size_t commit_cache_bits = 3;
// Same for snapshot cache size
const size_t snapshot_cache_size = 5;
const size_t snapshot_cache_bits = 2;
// Take some preliminary snapshots first. This is to stress the data structure
// that holds the old snapshots as it will be designed to be efficient when
@ -538,7 +594,7 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
std::set<uint64_t> committed_before;
DBImpl* mock_db = new DBImpl(options, dbname);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
mock_db, txn_db_options, snapshot_cache_size, commit_cache_size));
mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits));
// We continue until max advances a bit beyond the snapshot.
while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
// do prepare for a transaction

@ -41,12 +41,6 @@ Status WritePreparedTxn::Get(const ReadOptions& read_options,
pinnable_val, &callback);
}
Status WritePreparedTxn::CommitBatch(WriteBatch* /* unused */) {
// TODO(myabandeh) Implement this
throw std::runtime_error("CommitBatch not Implemented");
return Status::OK();
}
Status WritePreparedTxn::PrepareInternal() {
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
@ -97,6 +91,8 @@ Status WritePreparedTxn::CommitInternal() {
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
log_number_, disable_memtable, &seq_used);
uint64_t& commit_seq = seq_used;
// TODO(myabandeh): Reject a commit request if AddCommitted cannot encode
// commit_seq. This happens if prep_seq <<< commit_seq.
wpt_db_->AddCommitted(prepare_seq_, commit_seq);
return s;
}

@ -50,8 +50,6 @@ class WritePreparedTxn : public PessimisticTransaction {
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
Status CommitBatch(WriteBatch* batch) override;
Status Rollback() override;
private:

Loading…
Cancel
Save