WritePrepared Txn: fix non-emptied PreparedHeap bug

Summary:
Under a certain sequence of accessing PreparedHeap, there was a bug that would not successfully empty the heap. This would result in performance issues when the heap content is moved to old_prepared_ after max_evicted_seq_ advances the orphan prepared sequence numbers. The patch fixed the bug and add more unit tests. It also does more logging when the unlikely scenarios are faced
Closes https://github.com/facebook/rocksdb/pull/3526

Differential Revision: D7038486

Pulled By: maysamyabandeh

fbshipit-source-id: f1e40bea558f67b03d2a29131fcb8734c65fce97
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 8ada876dfe
commit 828211e901
  1. 3
      utilities/column_aware_encoding_util.cc
  2. 9
      utilities/transactions/pessimistic_transaction_db.h
  3. 7
      utilities/transactions/transaction_test.cc
  4. 71
      utilities/transactions/write_prepared_transaction_test.cc
  5. 5
      utilities/transactions/write_prepared_txn.cc
  6. 48
      utilities/transactions/write_prepared_txn_db.cc
  7. 33
      utilities/transactions/write_prepared_txn_db.h

@ -59,7 +59,8 @@ void ColumnAwareEncodingReader::InitTableReader(const std::string& file_path) {
/*skip_filters=*/false), /*skip_filters=*/false),
std::move(file_), file_size, &table_reader, /*enable_prefetch=*/false); std::move(file_), file_size, &table_reader, /*enable_prefetch=*/false);
table_reader_.reset(static_cast_with_check<BlockBasedTable, TableReader>(table_reader.release())); table_reader_.reset(static_cast_with_check<BlockBasedTable, TableReader>(
table_reader.release()));
} }
void ColumnAwareEncodingReader::GetKVPairsFromDataBlocks( void ColumnAwareEncodingReader::GetKVPairsFromDataBlocks(

@ -134,6 +134,11 @@ class PessimisticTransactionDB : public TransactionDB {
private: private:
friend class WritePreparedTxnDB; friend class WritePreparedTxnDB;
friend class WritePreparedTxnDBMock; friend class WritePreparedTxnDBMock;
friend class TransactionTest_DoubleEmptyWrite_Test;
friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test;
friend class TransactionTest_TwoPhaseLongPrepareTest_Test;
friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test;
friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test;
TransactionLockMgr lock_mgr_; TransactionLockMgr lock_mgr_;
// Must be held when adding/dropping column families. // Must be held when adding/dropping column families.
@ -150,6 +155,10 @@ class PessimisticTransactionDB : public TransactionDB {
// map from name to two phase transaction instance // map from name to two phase transaction instance
std::mutex name_map_mutex_; std::mutex name_map_mutex_;
std::unordered_map<TransactionName, Transaction*> transactions_; std::unordered_map<TransactionName, Transaction*> transactions_;
// Signal that we are testing a crash scenario. Some asserts could be relaxed
// in such cases.
virtual void TEST_Crash() {}
}; };
// A PessimisticTransactionDB that writes the data to the DB after the commit. // A PessimisticTransactionDB that writes the data to the DB after the commit.

@ -85,6 +85,7 @@ TEST_P(TransactionTest, DoubleEmptyWrite) {
txn0->Put(Slice("foo0"), Slice("bar0a")); txn0->Put(Slice("foo0"), Slice("bar0a"));
ASSERT_OK(txn0->Prepare()); ASSERT_OK(txn0->Prepare());
delete txn0; delete txn0;
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete()); ASSERT_OK(ReOpenNoDelete());
txn0 = db->GetTransactionByName("xid2"); txn0 = db->GetTransactionByName("xid2");
ASSERT_OK(txn0->Commit()); ASSERT_OK(txn0->Commit());
@ -950,6 +951,8 @@ TEST_P(TransactionTest, TwoPhaseNameTest) {
s = txn1->SetName("name4"); s = txn1->SetName("name4");
ASSERT_EQ(s, Status::InvalidArgument()); ASSERT_EQ(s, Status::InvalidArgument());
txn1->Rollback();
txn2->Rollback();
delete txn1; delete txn1;
delete txn2; delete txn2;
} }
@ -1173,6 +1176,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
db->FlushWAL(false); db->FlushWAL(false);
delete txn; delete txn;
// kill and reopen // kill and reopen
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
s = ReOpenNoDelete(); s = ReOpenNoDelete();
ASSERT_OK(s); ASSERT_OK(s);
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
@ -1361,6 +1365,7 @@ TEST_P(TransactionTest, TwoPhaseLongPrepareTest) {
if (i % 29 == 0) { if (i % 29 == 0) {
// crash // crash
env->SetFilesystemActive(false); env->SetFilesystemActive(false);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ReOpenNoDelete(); ReOpenNoDelete();
} else if (i % 37 == 0) { } else if (i % 37 == 0) {
// close // close
@ -1463,6 +1468,7 @@ TEST_P(TransactionTest, TwoPhaseDoubleRecoveryTest) {
// kill and reopen // kill and reopen
env->SetFilesystemActive(false); env->SetFilesystemActive(false);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ReOpenNoDelete(); ReOpenNoDelete();
// commit old txn // commit old txn
@ -1844,6 +1850,7 @@ TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) {
// kill and reopen // kill and reopen
env->SetFilesystemActive(false); env->SetFilesystemActive(false);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ReOpenNoDelete(); ReOpenNoDelete();
s = db->Get(read_options, "first", &value); s = db->Get(read_options, "first", &value);

@ -112,6 +112,73 @@ TEST(PreparedHeap, BasicsTest) {
ASSERT_TRUE(heap.empty()); ASSERT_TRUE(heap.empty());
} }
// This is a scenario reconstructed from a buggy trace. Test that the bug does
// not resurface again.
TEST(PreparedHeap, EmptyAtTheEnd) {
WritePreparedTxnDB::PreparedHeap heap;
heap.push(40l);
ASSERT_EQ(40l, heap.top());
// Although not a recommended scenario, we must be resilient against erase
// without a prior push.
heap.erase(50l);
ASSERT_EQ(40l, heap.top());
heap.push(60l);
ASSERT_EQ(40l, heap.top());
heap.erase(60l);
ASSERT_EQ(40l, heap.top());
heap.erase(40l);
ASSERT_TRUE(heap.empty());
heap.push(40l);
ASSERT_EQ(40l, heap.top());
heap.erase(50l);
ASSERT_EQ(40l, heap.top());
heap.push(60l);
ASSERT_EQ(40l, heap.top());
heap.erase(40l);
// Test that the erase has not emptied the heap (we had a bug doing that)
ASSERT_FALSE(heap.empty());
ASSERT_EQ(60l, heap.top());
heap.erase(60l);
ASSERT_TRUE(heap.empty());
}
// Generate random order of PreparedHeap access and test that the heap will be
// successfully emptied at the end.
TEST(PreparedHeap, Concurrent) {
const size_t t_cnt = 10;
rocksdb::port::Thread t[t_cnt];
Random rnd(1103);
WritePreparedTxnDB::PreparedHeap heap;
port::RWMutex prepared_mutex;
for (size_t n = 0; n < 100; n++) {
for (size_t i = 0; i < t_cnt; i++) {
// This is not recommended usage but we should be resilient against it.
bool skip_push = rnd.OneIn(5);
t[i] = rocksdb::port::Thread([&heap, &prepared_mutex, skip_push, i]() {
auto seq = i;
std::this_thread::yield();
if (!skip_push) {
WriteLock wl(&prepared_mutex);
heap.push(seq);
}
std::this_thread::yield();
{
WriteLock wl(&prepared_mutex);
heap.erase(seq);
}
});
}
for (size_t i = 0; i < t_cnt; i++) {
t[i].join();
}
ASSERT_TRUE(heap.empty());
}
}
TEST(CommitEntry64b, BasicTest) { TEST(CommitEntry64b, BasicTest) {
const size_t INDEX_BITS = static_cast<size_t>(21); const size_t INDEX_BITS = static_cast<size_t>(21);
const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS); const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS);
@ -952,6 +1019,7 @@ TEST_P(WritePreparedTransactionTest, BasicRecoveryTest) {
delete txn0; delete txn0;
delete txn1; delete txn1;
wp_db->db_impl_->FlushWAL(true); wp_db->db_impl_->FlushWAL(true);
wp_db->TEST_Crash();
ReOpenNoDelete(); ReOpenNoDelete();
wp_db = dynamic_cast<WritePreparedTxnDB*>(db); wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// After recovery, all the uncommitted txns (0 and 1) should be inserted into // After recovery, all the uncommitted txns (0 and 1) should be inserted into
@ -995,6 +1063,7 @@ TEST_P(WritePreparedTransactionTest, BasicRecoveryTest) {
delete txn2; delete txn2;
wp_db->db_impl_->FlushWAL(true); wp_db->db_impl_->FlushWAL(true);
wp_db->TEST_Crash();
ReOpenNoDelete(); ReOpenNoDelete();
wp_db = dynamic_cast<WritePreparedTxnDB*>(db); wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
ASSERT_TRUE(wp_db->prepared_txns_.empty()); ASSERT_TRUE(wp_db->prepared_txns_.empty());
@ -1064,6 +1133,7 @@ TEST_P(WritePreparedTransactionTest, ConflictDetectionAfterRecoveryTest) {
auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true); db_impl->FlushWAL(true);
dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
ReOpenNoDelete(); ReOpenNoDelete();
// It should still conflict after the recovery // It should still conflict after the recovery
@ -1324,6 +1394,7 @@ TEST_P(WritePreparedTransactionTest, RollbackTest) {
delete txn; delete txn;
auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true); db_impl->FlushWAL(true);
dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
ReOpenNoDelete(); ReOpenNoDelete();
wp_db = dynamic_cast<WritePreparedTxnDB*>(db); wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
txn = db->GetTransactionByName("xid0"); txn = db->GetTransactionByName("xid0");

@ -275,8 +275,11 @@ Status WritePreparedTxn::RollbackInternal() {
prepare_seq); prepare_seq);
// Commit the batch by writing an empty batch to the queue that will release // Commit the batch by writing an empty batch to the queue that will release
// the commit sequence number to readers. // the commit sequence number to readers.
const size_t ZERO_COMMITS = 0;
const bool PREP_HEAP_SKIPPED = true;
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
wpt_db_, db_impl_, prepare_seq, ONE_BATCH); wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS,
PREP_HEAP_SKIPPED);
WriteBatch empty_batch; WriteBatch empty_batch;
empty_batch.PutLogData(Slice()); empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator // In the absence of Prepare markers, use Noop as a batch separator

@ -11,6 +11,7 @@
#include "utilities/transactions/write_prepared_txn_db.h" #include "utilities/transactions/write_prepared_txn_db.h"
#include <inttypes.h>
#include <algorithm> #include <algorithm>
#include <string> #include <string>
#include <unordered_set> #include <unordered_set>
@ -104,12 +105,13 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
} }
if (batch_cnt == 0) { // not provided, then compute it if (batch_cnt == 0) { // not provided, then compute it
// TODO(myabandeh): add an option to allow user skipping this cost // TODO(myabandeh): add an option to allow user skipping this cost
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Duplicate key overhead");
SubBatchCounter counter(*GetCFComparatorMap()); SubBatchCounter counter(*GetCFComparatorMap());
auto s = batch->Iterate(&counter); auto s = batch->Iterate(&counter);
assert(s.ok()); assert(s.ok());
batch_cnt = counter.BatchCount(); batch_cnt = counter.BatchCount();
// TODO(myabandeh): replace me with a stat
ROCKS_LOG_WARN(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
static_cast<uint64_t>(batch_cnt));
} }
assert(batch_cnt); assert(batch_cnt);
@ -334,6 +336,9 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
// We should not normally reach here // We should not normally reach here
ReadLock rl(&prepared_mutex_); ReadLock rl(&prepared_mutex_);
// TODO(myabandeh): also add a stat
ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64,
static_cast<uint64_t>(delayed_prepared_.size()));
if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
// Then it is not committed yet // Then it is not committed yet
ROCKS_LOG_DETAILS( ROCKS_LOG_DETAILS(
@ -395,6 +400,8 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
// We should not normally reach here unless sapshot_seq is old. This is a // We should not normally reach here unless sapshot_seq is old. This is a
// rare case and it is ok to pay the cost of mutex ReadLock for such old, // rare case and it is ok to pay the cost of mutex ReadLock for such old,
// reading transactions. // reading transactions.
// TODO(myabandeh): also add a stat
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
ReadLock rl(&old_commit_map_mutex_); ReadLock rl(&old_commit_map_mutex_);
auto prep_set_entry = old_commit_map_.find(snapshot_seq); auto prep_set_entry = old_commit_map_.find(snapshot_seq);
bool found = prep_set_entry != old_commit_map_.end(); bool found = prep_set_entry != old_commit_map_.end();
@ -458,8 +465,10 @@ void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq,
void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
bool prepare_skipped, uint8_t loop_cnt) { bool prepare_skipped, uint8_t loop_cnt) {
ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, ROCKS_LOG_DETAILS(info_log_,
prepare_seq, commit_seq); "Txn %" PRIu64 " Committing with %" PRIu64
"(prepare_skipped=%d)",
prepare_seq, commit_seq, prepare_skipped);
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start"); TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause"); TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE; auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
@ -536,8 +545,11 @@ bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
return succ; return succ;
} }
void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max, void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
SequenceNumber& new_max) { const SequenceNumber& new_max) {
ROCKS_LOG_DETAILS(info_log_,
"AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64,
prev_max, new_max);
// When max_evicted_seq_ advances, move older entries from prepared_txns_ // When max_evicted_seq_ advances, move older entries from prepared_txns_
// to delayed_prepared_. This guarantees that if a seq is lower than max, // to delayed_prepared_. This guarantees that if a seq is lower than max,
// then it is not in prepared_txns_ ans save an expensive, synchronized // then it is not in prepared_txns_ ans save an expensive, synchronized
@ -548,6 +560,12 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max,
while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) { while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
auto to_be_popped = prepared_txns_.top(); auto to_be_popped = prepared_txns_.top();
delayed_prepared_.insert(to_be_popped); delayed_prepared_.insert(to_be_popped);
// TODO(myabandeh): also add a stat
ROCKS_LOG_WARN(info_log_,
"prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
" new_max=%" PRIu64 " oldmax=%" PRIu64,
static_cast<uint64_t>(delayed_prepared_.size()),
to_be_popped, new_max, prev_max);
prepared_txns_.pop(); prepared_txns_.pop();
delayed_prepared_empty_.store(false, std::memory_order_release); delayed_prepared_empty_.store(false, std::memory_order_release);
} }
@ -570,9 +588,11 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max,
if (update_snapshots) { if (update_snapshots) {
UpdateSnapshots(snapshots, new_snapshots_version); UpdateSnapshots(snapshots, new_snapshots_version);
} }
while (prev_max < new_max && !max_evicted_seq_.compare_exchange_weak( auto updated_prev_max = prev_max;
prev_max, new_max, std::memory_order_acq_rel, while (updated_prev_max < new_max &&
std::memory_order_relaxed)) { !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
}; };
} }
@ -600,11 +620,15 @@ void WritePreparedTxnDB::ReleaseSnapshotInternal(
// old_commit_map_. Check and do garbage collection if that is the case. // old_commit_map_. Check and do garbage collection if that is the case.
bool need_gc = false; bool need_gc = false;
{ {
// TODO(myabandeh): also add a stat
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
ReadLock rl(&old_commit_map_mutex_); ReadLock rl(&old_commit_map_mutex_);
auto prep_set_entry = old_commit_map_.find(snap_seq); auto prep_set_entry = old_commit_map_.find(snap_seq);
need_gc = prep_set_entry != old_commit_map_.end(); need_gc = prep_set_entry != old_commit_map_.end();
} }
if (need_gc) { if (need_gc) {
// TODO(myabandeh): also add a stat
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
WriteLock wl(&old_commit_map_mutex_); WriteLock wl(&old_commit_map_mutex_);
old_commit_map_.erase(snap_seq); old_commit_map_.erase(snap_seq);
old_commit_map_empty_.store(old_commit_map_.empty(), old_commit_map_empty_.store(old_commit_map_.empty(),
@ -623,6 +647,8 @@ void WritePreparedTxnDB::UpdateSnapshots(
#ifndef NDEBUG #ifndef NDEBUG
size_t sync_i = 0; size_t sync_i = 0;
#endif #endif
// TODO(myabandeh): replace me with a stat
ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead");
WriteLock wl(&snapshots_mutex_); WriteLock wl(&snapshots_mutex_);
snapshots_version_ = version; snapshots_version_ = version;
// We update the list concurrently with the readers. // We update the list concurrently with the readers.
@ -702,6 +728,8 @@ void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE && if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE &&
snapshot_seq < evicted.prep_seq)) { snapshot_seq < evicted.prep_seq)) {
// Then access the less efficient list of snapshots_ // Then access the less efficient list of snapshots_
// TODO(myabandeh): also add a stat
ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead");
ReadLock rl(&snapshots_mutex_); ReadLock rl(&snapshots_mutex_);
// Items could have moved from the snapshots_ to snapshot_cache_ before // Items could have moved from the snapshots_ to snapshot_cache_ before
// accquiring the lock. To make sure that we do not miss a valid snapshot, // accquiring the lock. To make sure that we do not miss a valid snapshot,
@ -734,6 +762,8 @@ bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
} }
// then snapshot_seq < commit_seq // then snapshot_seq < commit_seq
if (prep_seq <= snapshot_seq) { // overlapping range if (prep_seq <= snapshot_seq) { // overlapping range
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
// TODO(myabandeh): also add a stat
WriteLock wl(&old_commit_map_mutex_); WriteLock wl(&old_commit_map_mutex_);
old_commit_map_empty_.store(false, std::memory_order_release); old_commit_map_empty_.store(false, std::memory_order_release);
auto& vec = old_commit_map_[snapshot_seq]; auto& vec = old_commit_map_[snapshot_seq];

@ -231,9 +231,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
friend class WritePreparedTransactionTest_CommitMapTest_Test; friend class WritePreparedTransactionTest_CommitMapTest_Test;
friend class
WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test;
friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test; friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test;
friend class WritePreparedTransactionTestBase; friend class WritePreparedTransactionTestBase;
friend class PreparedHeap_BasicsTest_Test; friend class PreparedHeap_BasicsTest_Test;
friend class PreparedHeap_EmptyAtTheEnd_Test;
friend class PreparedHeap_Concurrent_Test;
friend class WritePreparedTxnDBMock; friend class WritePreparedTxnDBMock;
friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
@ -250,17 +254,34 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
heap_; heap_;
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
erased_heap_; erased_heap_;
// True when testing crash recovery
bool TEST_CRASH_ = false;
friend class WritePreparedTxnDB;
public: public:
~PreparedHeap() {
if (!TEST_CRASH_) {
assert(heap_.empty());
assert(erased_heap_.empty());
}
}
bool empty() { return heap_.empty(); } bool empty() { return heap_.empty(); }
uint64_t top() { return heap_.top(); } uint64_t top() { return heap_.top(); }
void push(uint64_t v) { heap_.push(v); } void push(uint64_t v) { heap_.push(v); }
void pop() { void pop() {
heap_.pop(); heap_.pop();
while (!heap_.empty() && !erased_heap_.empty() && while (!heap_.empty() && !erased_heap_.empty() &&
heap_.top() == erased_heap_.top()) { // heap_.top() > erased_heap_.top() could happen if we have erased
heap_.pop(); // a non-existent entry. Ideally the user should not do that but we
// should be resiliant againt it.
heap_.top() >= erased_heap_.top()) {
if (heap_.top() == erased_heap_.top()) {
heap_.pop();
}
auto erased __attribute__((__unused__)) = erased_heap_.top();
erased_heap_.pop(); erased_heap_.pop();
// No duplicate prepare sequence numbers
assert(erased_heap_.empty() || erased_heap_.top() != erased);
} }
while (heap_.empty() && !erased_heap_.empty()) { while (heap_.empty() && !erased_heap_.empty()) {
erased_heap_.pop(); erased_heap_.pop();
@ -272,6 +293,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Already popped, ignore it. // Already popped, ignore it.
} else if (heap_.top() == seq) { } else if (heap_.top() == seq) {
pop(); pop();
assert(heap_.empty() || heap_.top() != seq);
} else { // (heap_.top() > seq) } else { // (heap_.top() > seq)
// Down the heap, remember to pop it later // Down the heap, remember to pop it later
erased_heap_.push(seq); erased_heap_.push(seq);
@ -280,6 +302,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
} }
}; };
void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; }
// Get the commit entry with index indexed_seq from the commit table. It // Get the commit entry with index indexed_seq from the commit table. It
// returns true if such entry exists. // returns true if such entry exists.
bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b, bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b,
@ -305,7 +329,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// concurrently. The concurrent invocations of this function is equivalent to // concurrently. The concurrent invocations of this function is equivalent to
// a serial invocation in which the last invocation is the one with the // a serial invocation in which the last invocation is the one with the
// largetst new_max value. // largetst new_max value.
void AdvanceMaxEvictedSeq(SequenceNumber& prev_max, SequenceNumber& new_max); void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
const SequenceNumber& new_max);
virtual const std::vector<SequenceNumber> GetSnapshotListFromDB( virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
SequenceNumber max); SequenceNumber max);
@ -362,7 +387,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// A heap of prepared transactions. Thread-safety is provided with // A heap of prepared transactions. Thread-safety is provided with
// prepared_mutex_. // prepared_mutex_.
PreparedHeap prepared_txns_; PreparedHeap prepared_txns_;
// 10m entry, 80MB size // 2m entry, 16MB size
static const size_t DEF_COMMIT_CACHE_BITS = static_cast<size_t>(21); static const size_t DEF_COMMIT_CACHE_BITS = static_cast<size_t>(21);
const size_t COMMIT_CACHE_BITS; const size_t COMMIT_CACHE_BITS;
const size_t COMMIT_CACHE_SIZE; const size_t COMMIT_CACHE_SIZE;

Loading…
Cancel
Save