Add unit test for WritePrepared skeleton

Summary: Closes https://github.com/facebook/rocksdb/pull/2756

Differential Revision: D5660516

Pulled By: maysamyabandeh

fbshipit-source-id: f3f3d3b5f544007a7fbdd78e49e4738b4437c7ee
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent a12479819d
commit cd26af3476
  1. 8
      utilities/transactions/pessimistic_transaction_db.cc
  2. 27
      utilities/transactions/pessimistic_transaction_db.h
  3. 148
      utilities/transactions/transaction_test.cc

@ -607,6 +607,7 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
delayed_prepared_empty_.store(false, std::memory_order_release); delayed_prepared_empty_.store(false, std::memory_order_release);
} }
} }
// With each change to max_evicted_seq_ fetch the live snapshots behind it
{ {
WriteLock wl(&snapshots_mutex_); WriteLock wl(&snapshots_mutex_);
InstrumentedMutex(db_impl_->mutex()); InstrumentedMutex(db_impl_->mutex());
@ -622,9 +623,7 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
// be kept around because it overlaps with a live snapshot. // be kept around because it overlaps with a live snapshot.
{ {
ReadLock rl(&snapshots_mutex_); ReadLock rl(&snapshots_mutex_);
for (auto snapshot : snapshots_) { for (auto snapshot_seq : snapshots_) {
auto snapshot_seq =
reinterpret_cast<const SnapshotImpl*>(snapshot)->number_;
if (evicted.commit_seq <= snapshot_seq) { if (evicted.commit_seq <= snapshot_seq) {
break; break;
} }
@ -691,5 +690,8 @@ bool WritePreparedTxnDB::ExchangeCommitEntry(uint64_t indexed_seq,
return true; return true;
} }
// 10m entry, 80MB size
uint64_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE =
static_cast<uint64_t>(1 << 21);
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -161,11 +161,17 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
public: public:
explicit WritePreparedTxnDB(DB* db, explicit WritePreparedTxnDB(DB* db,
const TransactionDBOptions& txn_db_options) const TransactionDBOptions& txn_db_options)
: PessimisticTransactionDB(db, txn_db_options) {} : PessimisticTransactionDB(db, txn_db_options),
COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) {
init(txn_db_options);
}
explicit WritePreparedTxnDB(StackableDB* db, explicit WritePreparedTxnDB(StackableDB* db,
const TransactionDBOptions& txn_db_options) const TransactionDBOptions& txn_db_options)
: PessimisticTransactionDB(db, txn_db_options) {} : PessimisticTransactionDB(db, txn_db_options),
COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) {
init(txn_db_options);
}
virtual ~WritePreparedTxnDB() {} virtual ~WritePreparedTxnDB() {}
@ -183,6 +189,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq); void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq);
private: private:
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
void init(const TransactionDBOptions& /* unused */) {
commit_cache_ =
unique_ptr<CommitEntry[]>(new CommitEntry[COMMIT_CACHE_SIZE]{});
}
// A heap with the amortized O(1) complexity for erase. It uses one extra heap // A heap with the amortized O(1) complexity for erase. It uses one extra heap
// to keep track of erased entries that are not yet on top of the main heap. // to keep track of erased entries that are not yet on top of the main heap.
class PreparedHeap { class PreparedHeap {
@ -236,11 +249,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// A heap of prepared transactions. Thread-safety is provided with // A heap of prepared transactions. Thread-safety is provided with
// prepared_mutex_. // prepared_mutex_.
PreparedHeap prepared_txns_; PreparedHeap prepared_txns_;
// 10m entry, 80MB size static uint64_t DEF_COMMIT_CACHE_SIZE;
static const uint64_t COMMIT_CACHE_SIZE = static_cast<uint64_t>(1 << 21); const uint64_t COMMIT_CACHE_SIZE;
// commit_cache_ is initialized to zero to tell apart an empty index from a // commit_cache_ must be initialized to zero to tell apart an empty index from
// filled one. Thread-safety is provided with commit_cache_mutex_. // a filled one. Thread-safety is provided with commit_cache_mutex_.
CommitEntry commit_cache_[COMMIT_CACHE_SIZE] = {}; unique_ptr<CommitEntry[]> commit_cache_;
// The largest evicted *commit* sequence number from the commit_cache_ // The largest evicted *commit* sequence number from the commit_cache_
std::atomic<uint64_t> max_evicted_seq_ = {}; std::atomic<uint64_t> max_evicted_seq_ = {};
// A map of the evicted entries from commit_cache_ that has to be kept around // A map of the evicted entries from commit_cache_ that has to be kept around

@ -17,7 +17,6 @@
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
#include "table/mock_table.h" #include "table/mock_table.h"
#include "util/fault_injection_test_env.h" #include "util/fault_injection_test_env.h"
#include "util/logging.h"
#include "util/random.h" #include "util/random.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -26,6 +25,7 @@
#include "util/transaction_test_util.h" #include "util/transaction_test_util.h"
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend.h" #include "utilities/merge_operators/string_append/stringappend.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "port/port.h" #include "port/port.h"
@ -33,8 +33,8 @@ using std::string;
namespace rocksdb { namespace rocksdb {
class TransactionTest class TransactionTest : public ::testing::TestWithParam<
: public ::testing::TestWithParam<std::tuple<bool, bool>> { std::tuple<bool, bool, TxnDBWritePolicy>> {
public: public:
TransactionDB* db; TransactionDB* db;
FaultInjectionTestEnv* env; FaultInjectionTestEnv* env;
@ -57,6 +57,7 @@ class TransactionTest
DestroyDB(dbname, options); DestroyDB(dbname, options);
txn_db_options.transaction_lock_timeout = 0; txn_db_options.transaction_lock_timeout = 0;
txn_db_options.default_lock_timeout = 0; txn_db_options.default_lock_timeout = 0;
txn_db_options.write_policy = std::get<2>(GetParam());
Status s; Status s;
if (std::get<0>(GetParam()) == false) { if (std::get<0>(GetParam()) == false) {
s = TransactionDB::Open(options, txn_db_options, dbname, &db); s = TransactionDB::Open(options, txn_db_options, dbname, &db);
@ -123,16 +124,23 @@ class TransactionTest
}; };
class MySQLStyleTransactionTest : public TransactionTest {}; class MySQLStyleTransactionTest : public TransactionTest {};
class WritePreparedTransactionTest : public TransactionTest {};
static const TxnDBWritePolicy wc = WRITE_COMMITTED;
static const TxnDBWritePolicy wp = WRITE_PREPARED;
// TODO(myabandeh): Instantiate the tests with other write policies
INSTANTIATE_TEST_CASE_P(DBAsBaseDB, TransactionTest, INSTANTIATE_TEST_CASE_P(DBAsBaseDB, TransactionTest,
::testing::Values(std::make_tuple(false, false))); ::testing::Values(std::make_tuple(false, false, wc)));
INSTANTIATE_TEST_CASE_P(StackableDBAsBaseDB, TransactionTest, INSTANTIATE_TEST_CASE_P(StackableDBAsBaseDB, TransactionTest,
::testing::Values(std::make_tuple(true, false))); ::testing::Values(std::make_tuple(true, false, wc)));
INSTANTIATE_TEST_CASE_P(MySQLStyleTransactionTest, MySQLStyleTransactionTest, INSTANTIATE_TEST_CASE_P(MySQLStyleTransactionTest, MySQLStyleTransactionTest,
::testing::Values(std::make_tuple(false, false), ::testing::Values(std::make_tuple(false, false, wc),
std::make_tuple(false, true), std::make_tuple(false, true, wc),
std::make_tuple(true, false), std::make_tuple(true, false, wc),
std::make_tuple(true, true))); std::make_tuple(true, true, wc)));
INSTANTIATE_TEST_CASE_P(WritePreparedTransactionTest,
WritePreparedTransactionTest,
::testing::Values(std::make_tuple(false, true, wp)));
TEST_P(TransactionTest, DoubleEmptyWrite) { TEST_P(TransactionTest, DoubleEmptyWrite) {
WriteOptions write_options; WriteOptions write_options;
@ -4720,6 +4728,128 @@ TEST_P(TransactionTest, MemoryLimitTest) {
delete txn; delete txn;
} }
// Test WritePreparedTxnDB's IsInSnapshot against different ordering of
// snapshot, max_committed_seq_, prepared, and commit entries.
TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
WriteOptions wo;
// Use small commit cache to trigger lots of eviction and fast advance of
// max_evicted_seq_
WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE =
8; // will take effect after ReOpen
// Take some preliminary snapshots first. This is to stress the data structure
// that holds the old snapshots as it will be designed to be efficient when
// only a few snapshots are below the max_evicted_seq_.
for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) {
// Leave some gap between the preliminary snapshots and the final snapshot
// that we check. This should test for also different overlapping scnearios
// between the last snapshot and the commits.
for (int max_gap = 1; max_gap < 10; max_gap++) {
// Since we do not actually write to db, we mock the seq as it would be
// increaased by the db. The only exception is that we need db seq to
// advance for our snapshots. for which we apply a dummy put each time we
// increase our mock of seq.
uint64_t seq = 0;
// At each step we prepare a txn and then we commit it in the next txn.
// This emulates the consecuitive transactions that write to the same key
uint64_t cur_txn = 0;
// Number of snapshots taken so far
int num_snapshots = 0;
// Number of gaps applied so far
int gap_cnt = 0;
// The final snapshot that we will inspect
uint64_t snapshot = 0;
bool found_committed = false;
// To stress the data structure that maintain prepared txns, at each cycle
// we add a new prepare txn. These do not mean to be committed for
// snapshot inspection.
std::set<uint64_t> prepared;
// We keep the list of txns comitted before we take the last snaphot.
// These should be the only seq numbers that will be found in the snapshot
std::set<uint64_t> committed_before;
ReOpen(); // to restart the db
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
assert(wp_db);
assert(wp_db->db_impl_);
// We continue until max advances a bit beyond the snapshot.
while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
// do prepare for a transaction
wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq
seq++;
ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq);
wp_db->AddPrepared(seq);
prepared.insert(seq);
// If cur_txn is not started, do prepare for it.
if (!cur_txn) {
wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq
seq++;
ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq);
cur_txn = seq;
wp_db->AddPrepared(cur_txn);
} else { // else commit it
wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq
seq++;
ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq);
wp_db->AddCommitted(cur_txn, seq);
if (!snapshot) {
committed_before.insert(cur_txn);
}
cur_txn = 0;
}
if (num_snapshots < max_snapshots - 1) {
// Take preliminary snapshots
db->GetSnapshot();
num_snapshots++;
} else if (gap_cnt < max_gap) {
// Wait for some gap before taking the final snapshot
gap_cnt++;
} else if (!snapshot) {
// Take the final snapshot if it is not already taken
snapshot = db->GetSnapshot()->GetSequenceNumber();
// We increase the db seq artificailly by a dummy Put. Check that this
// technique is effective and db seq is that same as ours.
ASSERT_EQ(snapshot, seq);
num_snapshots++;
}
// If the snapshot is taken, verify seq numbers visible to it. We redo
// it at each cycle to test that the system is still sound when
// max_evicted_seq_ advances.
if (snapshot) {
for (uint64_t s = 0; s <= seq; s++) {
bool was_committed =
(committed_before.find(s) != committed_before.end());
bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
if (was_committed != is_in_snapshot) {
printf(
"max_snapshots %d max_gap %d seq %lu max %lu snapshot %lu "
"gap_cnt %d num_snapshots %d\n",
max_snapshots, max_gap, seq, wp_db->max_evicted_seq_.load(),
snapshot, gap_cnt, num_snapshots);
}
ASSERT_EQ(was_committed, is_in_snapshot);
found_committed = found_committed || is_in_snapshot;
}
}
}
// Safety check to make sure the test actually ran
ASSERT_TRUE(found_committed);
// As an extra check, check if prepared set will be properly empty after
// they are committed.
if (cur_txn) {
wp_db->AddCommitted(cur_txn, seq);
}
for (auto p : prepared) {
wp_db->AddCommitted(p, seq);
}
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
ASSERT_TRUE(wp_db->prepared_txns_.empty());
}
}
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save