Add more unit test to write_prepared txns

Summary: Closes https://github.com/facebook/rocksdb/pull/2798

Differential Revision: D5724173

Pulled By: maysamyabandeh

fbshipit-source-id: fb6b782d933fb4be315b1a231a6a67a66fdc9c96
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 06b37eef7b
commit 26ac24f199
  1. 6
      .travis.yml
  2. 1
      CMakeLists.txt
  3. 7
      Makefile
  4. 10
      TARGETS
  5. 1
      src.mk
  6. 3
      util/sync_point.h
  7. 201
      utilities/transactions/pessimistic_transaction_db.cc
  8. 49
      utilities/transactions/pessimistic_transaction_db.h
  9. 251
      utilities/transactions/transaction_test.cc
  10. 131
      utilities/transactions/transaction_test.h
  11. 569
      utilities/transactions/write_prepared_transaction_test.cc

@ -21,6 +21,7 @@ env:
- TEST_GROUP=platform_dependent # 16-18 minutes
- TEST_GROUP=1 # 33-35 minutes
- TEST_GROUP=2 # 30-32 minutes
- TEST_GROUP=3 # ? minutes - under development
# Run java tests
- JOB_NAME=java_test # 4-11 minutes
# Build ROCKSDB_LITE
@ -36,6 +37,8 @@ matrix:
env: TEST_GROUP=1
- os: osx
env: TEST_GROUP=2
- os: osx
env: TEST_GROUP=3
- os : osx
env: JOB_NAME=cmake-mingw
- os : linux
@ -59,7 +62,8 @@ script:
- ${CXX} --version
- if [ "${TEST_GROUP}" == 'platform_dependent' ]; then ccache -C && OPT=-DTRAVIS V=1 ROCKSDBTESTS_END=db_block_cache_test make -j4 all_but_some_tests check_some; fi
- if [ "${TEST_GROUP}" == '1' ]; then OPT=-DTRAVIS V=1 ROCKSDBTESTS_START=db_block_cache_test ROCKSDBTESTS_END=comparator_db_test make -j4 check_some; fi
- if [ "${TEST_GROUP}" == '2' ]; then OPT=-DTRAVIS V=1 ROCKSDBTESTS_START=comparator_db_test make -j4 check_some; fi
- if [ "${TEST_GROUP}" == '2' ]; then OPT=-DTRAVIS V=1 ROCKSDBTESTS_START=comparator_db_test ROCKSDBTESTS_END=write_prepared_transaction_test make -j4 check_some; fi
- if [ "${TEST_GROUP}" == '3' ]; then OPT=-DTRAVIS V=1 ROCKSDBTESTS_START=write_prepared_transaction_test make -j4 check_some; fi
- if [ "${JOB_NAME}" == 'java_test' ]; then OPT=-DTRAVIS V=1 make clean jclean && make rocksdbjava jtest; fi
- if [ "${JOB_NAME}" == 'lite_build' ]; then OPT="-DTRAVIS -DROCKSDB_LITE" V=1 make -j4 static_lib; fi
- if [ "${JOB_NAME}" == 'examples' ]; then OPT=-DTRAVIS V=1 make -j4 static_lib; cd examples; make -j4; fi

@ -830,6 +830,7 @@ if(WITH_TESTS)
utilities/table_properties_collectors/compact_on_deletion_collector_test.cc
utilities/transactions/optimistic_transaction_test.cc
utilities/transactions/transaction_test.cc
utilities/transactions/write_prepared_transaction_test.cc
utilities/ttl/ttl_test.cc
utilities/write_batch_with_index/write_batch_with_index_test.cc
)

@ -476,6 +476,7 @@ TESTS = \
object_registry_test \
repair_test \
env_timed_test \
write_prepared_transaction_test \
PARALLEL_TEST = \
backupable_db_test \
@ -491,7 +492,8 @@ PARALLEL_TEST = \
manual_compaction_test \
persistent_cache_test \
table_test \
transaction_test
transaction_test \
write_prepared_transaction_test
SUBSET := $(TESTS)
ifdef ROCKSDBTESTS_START
@ -1392,6 +1394,9 @@ heap_test: util/heap_test.o $(GTEST)
transaction_test: utilities/transactions/transaction_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
write_prepared_transaction_test: utilities/transactions/write_prepared_transaction_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
sst_dump: tools/sst_dump.o $(LIBOBJECTS)
$(AM_LINK)

@ -15,7 +15,6 @@ rocksdb_compiler_flags = [
"-DROCKSDB_SCHED_GETCPU_PRESENT",
"-DROCKSDB_SUPPORT_THREAD_LOCAL",
"-DOS_LINUX",
"-DROCKSDB_UBSAN_RUN",
# Flags to enable libs we include
"-DSNAPPY",
"-DZLIB",
@ -476,7 +475,9 @@ ROCKS_TESTS = [['arena_test', 'util/arena_test.cc', 'serial'],
['thread_list_test', 'util/thread_list_test.cc', 'serial'],
['thread_local_test', 'util/thread_local_test.cc', 'serial'],
['timer_queue_test', 'util/timer_queue_test.cc', 'serial'],
['transaction_test', 'utilities/transactions/transaction_test.cc', 'serial'],
['transaction_test',
'utilities/transactions/transaction_test.cc',
'parallel'],
['ttl_test', 'utilities/ttl/ttl_test.cc', 'serial'],
['util_merge_operators_test',
'utilities/util_merge_operators_test.cc',
@ -493,7 +494,10 @@ ROCKS_TESTS = [['arena_test', 'util/arena_test.cc', 'serial'],
'memtable/write_buffer_manager_test.cc',
'serial'],
['write_callback_test', 'db/write_callback_test.cc', 'serial'],
['write_controller_test', 'db/write_controller_test.cc', 'serial']]
['write_controller_test', 'db/write_controller_test.cc', 'serial'],
['write_prepared_transaction_test',
'utilities/transactions/write_prepared_transaction_test.cc',
'serial']]
# Generate a test rule for each entry in ROCKS_TESTS

@ -354,6 +354,7 @@ MAIN_SOURCES = \
utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \
utilities/transactions/optimistic_transaction_test.cc \
utilities/transactions/transaction_test.cc \
utilities/transactions/write_prepared_transaction_test.cc \
utilities/ttl/ttl_test.cc \
utilities/write_batch_with_index/write_batch_with_index_test.cc \

@ -46,6 +46,7 @@ extern void TestKillRandom(std::string kill_point, int odds,
#ifdef NDEBUG
#define TEST_SYNC_POINT(x)
#define TEST_IDX_SYNC_POINT(x, index)
#define TEST_SYNC_POINT_CALLBACK(x, y)
#else
@ -135,6 +136,8 @@ class SyncPoint {
// See TransactionLogIteratorRace in db_test.cc for an example use case.
// TEST_SYNC_POINT is no op in release build.
#define TEST_SYNC_POINT(x) rocksdb::SyncPoint::GetInstance()->Process(x)
#define TEST_IDX_SYNC_POINT(x, index) \
rocksdb::SyncPoint::GetInstance()->Process(x + std::to_string(index))
#define TEST_SYNC_POINT_CALLBACK(x, y) \
rocksdb::SyncPoint::GetInstance()->Process(x, y)
#endif // NDEBUG

@ -22,6 +22,7 @@
#include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"
#include "util/mutexlock.h"
#include "util/sync_point.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"
@ -605,6 +606,68 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
if (prev_max < evicted.commit_seq) {
// TODO(myabandeh) inc max in larger steps to avoid frequent updates
auto max_evicted_seq = evicted.commit_seq;
AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
}
// After each eviction from commit cache, check if the commit entry should
// be kept around because it overlaps with a live snapshot.
CheckAgainstSnapshots(evicted);
}
bool succ =
ExchangeCommitEntry(indexed_seq, evicted, {prepare_seq, commit_seq});
if (!succ) {
// A very rare event, in which the commit entry is updated before we do.
// Here we apply a very simple solution of retrying.
// TODO(myabandeh): do precautions to detect bugs that cause infinite loops
AddCommitted(prepare_seq, commit_seq);
return;
}
{
WriteLock wl(&prepared_mutex_);
prepared_txns_.erase(prepare_seq);
bool was_empty = delayed_prepared_.empty();
if (!was_empty) {
delayed_prepared_.erase(prepare_seq);
bool is_empty = delayed_prepared_.empty();
if (was_empty != is_empty) {
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
}
}
}
}
bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
CommitEntry* entry) {
// TODO(myabandeh): implement lock-free commit_cache_
ReadLock rl(&commit_cache_mutex_);
*entry = commit_cache_[indexed_seq];
return (entry->commit_seq != 0); // initialized
}
bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq,
const CommitEntry& new_entry,
CommitEntry* evicted_entry) {
// TODO(myabandeh): implement lock-free commit_cache_
WriteLock wl(&commit_cache_mutex_);
*evicted_entry = commit_cache_[indexed_seq];
commit_cache_[indexed_seq] = new_entry;
return (evicted_entry->commit_seq != 0); // initialized
}
bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
const CommitEntry& expected_entry,
const CommitEntry& new_entry) {
// TODO(myabandeh): implement lock-free commit_cache_
WriteLock wl(&commit_cache_mutex_);
auto& evicted_entry = commit_cache_[indexed_seq];
if (evicted_entry.prep_seq != expected_entry.prep_seq) {
return false;
}
commit_cache_[indexed_seq] = new_entry;
return true;
}
void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max,
SequenceNumber& new_max) {
// When max_evicted_seq_ advances, move older entries from prepared_txns_
// to delayed_prepared_. This guarantees that if a seq is lower than max,
// then it is not in prepared_txns_ ans save an expensive, synchronized
@ -612,8 +675,7 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
// normal cases.
{
WriteLock wl(&prepared_mutex_);
while (!prepared_txns_.empty() &&
prepared_txns_.top() <= max_evicted_seq) {
while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
auto to_be_popped = prepared_txns_.top();
delayed_prepared_.insert(to_be_popped);
prepared_txns_.pop();
@ -623,7 +685,7 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
// With each change to max_evicted_seq_ fetch the live snapshots behind it
SequenceNumber curr_seq;
std::vector<SequenceNumber> all_snapshots;
std::vector<SequenceNumber> snapshots;
bool update_snapshots = false;
{
InstrumentedMutex(db_impl_->mutex());
@ -640,13 +702,34 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
// with a more recent vesion by a concrrent thread
update_snapshots = true;
// We only care about snapshots lower then max
all_snapshots =
db_impl_->snapshots().GetAll(nullptr, max_evicted_seq);
snapshots = db_impl_->snapshots().GetAll(nullptr, new_max);
}
}
if (update_snapshots) {
UpdateSnapshots(snapshots, curr_seq);
}
// TODO(myabandeh): check if it worked with relaxed ordering
while (prev_max < new_max && !max_evicted_seq_.compare_exchange_weak(
prev_max, new_max, std::memory_order_release,
std::memory_order_acquire)) {
};
}
// 10m entry, 80MB size
size_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = static_cast<size_t>(1 << 21);
size_t WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE =
static_cast<size_t>(1 << 7);
void WritePreparedTxnDB::UpdateSnapshots(
const std::vector<SequenceNumber>& snapshots,
const SequenceNumber& version) {
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
#ifndef NDEBUG
size_t sync_i = 0;
#endif
WriteLock wl(&snapshots_mutex_);
snapshots_version_ = curr_seq;
snapshots_version_ = version;
// We update the list concurrently with the readers.
// Both new and old lists are sorted and the new list is subset of the
// previous list plus some new items. Thus if a snapshot repeats in
@ -659,29 +742,39 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
// update, either before it gets overwritten by the writer or
// afterwards.
size_t i = 0;
auto it = all_snapshots.begin();
for (; it != all_snapshots.end() && i < SNAPSHOT_CACHE_SIZE;
it++, i++) {
auto it = snapshots.begin();
for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; it++, i++) {
snapshot_cache_[i].store(*it, std::memory_order_release);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
}
#ifndef NDEBUG
// Release the remaining sync points since they are useless given that the
// reader would also use lock to access snapshots
for (++sync_i; sync_i <= 10; ++sync_i) {
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
}
#endif
snapshots_.clear();
for (; it != all_snapshots.end(); it++) {
for (; it != snapshots.end(); it++) {
// Insert them to a vector that is less efficient to access
// concurrently
snapshots_.push_back(*it);
}
// Update the size at the end. Otherwise a parallel reader might read
// items that are not set yet.
snapshots_total_.store(all_snapshots.size(), std::memory_order_release);
}
while (prev_max < max_evicted_seq &&
!max_evicted_seq_.compare_exchange_weak(
prev_max, max_evicted_seq, std::memory_order_release,
std::memory_order_acquire)) {
};
snapshots_total_.store(snapshots.size(), std::memory_order_release);
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
}
// After each eviction from commit cache, check if the commit entry should
// be kept around because it overlaps with a live snapshot.
void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
#ifndef NDEBUG
size_t sync_i = 0;
#endif
// First check the snapshot cache that is efficient for concurrent access
auto cnt = snapshots_total_.load(std::memory_order_acquire);
// The list might get updated concurrently as we are reading from it. The
@ -694,11 +787,23 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
for (; 0 < ip1; ip1--) {
snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
++sync_i);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
snapshot_seq, !next_is_larger)) {
break;
}
}
#ifndef NDEBUG
// Release the remaining sync points before accquiring the lock
for (++sync_i; sync_i <= 10; ++sync_i) {
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
}
#endif
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE &&
snapshot_seq < evicted.prep_seq)) {
// Then access the less efficient list of snapshots_
@ -721,64 +826,6 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
}
}
}
bool succ =
ExchangeCommitEntry(indexed_seq, evicted, {prepare_seq, commit_seq});
if (!succ) {
// A very rare event, in which the commit entry is updated before we do.
// Here we apply a very simple solution of retrying.
// TODO(myabandeh): do precautions to detect bugs that cause infinite loops
AddCommitted(prepare_seq, commit_seq);
return;
}
{
WriteLock wl(&prepared_mutex_);
prepared_txns_.erase(prepare_seq);
bool was_empty = delayed_prepared_.empty();
if (!was_empty) {
delayed_prepared_.erase(prepare_seq);
bool is_empty = delayed_prepared_.empty();
if (was_empty != is_empty) {
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
}
}
}
}
bool WritePreparedTxnDB::GetCommitEntry(uint64_t indexed_seq,
CommitEntry* entry) {
// TODO(myabandeh): implement lock-free commit_cache_
ReadLock rl(&commit_cache_mutex_);
*entry = commit_cache_[indexed_seq];
return (entry->commit_seq != 0); // initialized
}
bool WritePreparedTxnDB::AddCommitEntry(uint64_t indexed_seq,
CommitEntry& new_entry,
CommitEntry* evicted_entry) {
// TODO(myabandeh): implement lock-free commit_cache_
WriteLock wl(&commit_cache_mutex_);
*evicted_entry = commit_cache_[indexed_seq];
commit_cache_[indexed_seq] = new_entry;
return (evicted_entry->commit_seq != 0); // initialized
}
bool WritePreparedTxnDB::ExchangeCommitEntry(uint64_t indexed_seq,
CommitEntry& expected_entry,
CommitEntry new_entry) {
// TODO(myabandeh): implement lock-free commit_cache_
WriteLock wl(&commit_cache_mutex_);
auto& evicted_entry = commit_cache_[indexed_seq];
if (evicted_entry.prep_seq != expected_entry.prep_seq) {
return false;
}
commit_cache_[indexed_seq] = new_entry;
return true;
}
// 10m entry, 80MB size
size_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = static_cast<size_t>(1 << 21);
size_t WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE =
static_cast<size_t>(1 << 7);
bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
const uint64_t& prep_seq, const uint64_t& commit_seq,

@ -109,6 +109,9 @@ class PessimisticTransactionDB : public TransactionDB {
uint64_t commit_seq;
CommitEntry() : prep_seq(0), commit_seq(0) {}
CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
bool operator==(const CommitEntry& rhs) const {
return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq;
}
};
protected:
@ -196,6 +199,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
private:
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
friend class WritePreparedTransactionTest_CommitMapTest_Test;
friend class WritePreparedTransactionTest_SnapshotConcurrentAccessTest_Test;
friend class WritePreparedTransactionTest;
friend class PreparedHeap_BasicsTest_Test;
void init(const TransactionDBOptions& /* unused */) {
snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>(
@ -223,13 +231,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
heap_.pop();
erased_heap_.pop();
}
while (heap_.empty() && !erased_heap_.empty()) {
erased_heap_.pop();
}
}
void erase(uint64_t seq) {
if (!heap_.empty()) {
if (seq < heap_.top()) {
// Already popped, ignore it.
} else if (heap_.top() == seq) {
heap_.pop();
pop();
} else { // (heap_.top() > seq)
// Down the heap, remember to pop it later
erased_heap_.push(seq);
@ -240,17 +251,45 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Get the commit entry with index indexed_seq from the commit table. It
// returns true if such entry exists.
bool GetCommitEntry(uint64_t indexed_seq, CommitEntry* entry);
bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry* entry);
// Rewrite the entry with the index indexed_seq in the commit table with the
// commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
// sets the evicted_entry and returns true.
bool AddCommitEntry(uint64_t indexed_seq, CommitEntry& new_entry,
bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry,
CommitEntry* evicted_entry);
// Rewrite the entry with the index indexed_seq in the commit table with the
// commit entry new_entry only if the existing entry matches the
// expected_entry. Returns false otherwise.
bool ExchangeCommitEntry(uint64_t indexed_seq, CommitEntry& expected_entry,
CommitEntry new_entry);
bool ExchangeCommitEntry(const uint64_t indexed_seq,
const CommitEntry& expected_entry,
const CommitEntry& new_entry);
// Increase max_evicted_seq_ from the previous value prev_max to the new
// value. This also involves taking care of prepared txns that are not
// committed before new_max, as well as updating the list of live snapshots at
// the time of updating the max. Thread-safety: this function can be called
// concurrently. The concurrent invocations of this function is equivalent to
// a serial invocation in which the last invocation is the one with the
// largetst new_max value.
void AdvanceMaxEvictedSeq(SequenceNumber& prev_max, SequenceNumber& new_max);
// Update the list of snapshots corresponding to the soon-to-be-updated
// max_eviceted_seq_. Thread-safety: this function can be called concurrently.
// The concurrent invocations of this function is equivalent to a serial
// invocation in which the last invocation is the one with the largetst
// version value.
void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots,
const SequenceNumber& version);
// Check an evicted entry against live snapshots to see if it should be kept
// around or it can be safely discarded (and hence assume committed for all
// snapshots). Thread-safety: this function can be called concurrently. If it
// is called concurrently with multiple UpdateSnapshots, the result is the
// same as checking the intersection of the snapshot list before updates with
// the snapshot list of all the concurrent updates.
void CheckAgainstSnapshots(const CommitEntry& evicted);
// Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
// commit_seq. Return false if checking the next snapshot(s) is not needed.

@ -9,7 +9,8 @@
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include "utilities/transactions/transaction_test.h"
#include <algorithm>
#include <functional>
#include <string>
@ -38,114 +39,20 @@ using std::string;
namespace rocksdb {
class TransactionTest : public ::testing::TestWithParam<
std::tuple<bool, bool, TxnDBWritePolicy>> {
public:
TransactionDB* db;
FaultInjectionTestEnv* env;
string dbname;
Options options;
TransactionDBOptions txn_db_options;
TransactionTest() {
options.create_if_missing = true;
options.max_write_buffer_number = 2;
options.write_buffer_size = 4 * 1024;
options.level0_file_num_compaction_trigger = 2;
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
env = new FaultInjectionTestEnv(Env::Default());
options.env = env;
options.concurrent_prepare = std::get<1>(GetParam());
dbname = test::TmpDir() + "/transaction_testdb";
DestroyDB(dbname, options);
txn_db_options.transaction_lock_timeout = 0;
txn_db_options.default_lock_timeout = 0;
txn_db_options.write_policy = std::get<2>(GetParam());
Status s;
if (std::get<0>(GetParam()) == false) {
s = TransactionDB::Open(options, txn_db_options, dbname, &db);
} else {
s = OpenWithStackableDB();
}
assert(s.ok());
}
~TransactionTest() {
delete db;
DestroyDB(dbname, options);
delete env;
}
Status ReOpenNoDelete() {
delete db;
db = nullptr;
env->AssertNoOpenFile();
env->DropUnsyncedFileData();
env->ResetState();
Status s;
if (std::get<0>(GetParam()) == false) {
s = TransactionDB::Open(options, txn_db_options, dbname, &db);
} else {
s = OpenWithStackableDB();
}
return s;
}
Status ReOpen() {
delete db;
DestroyDB(dbname, options);
Status s;
if (std::get<0>(GetParam()) == false) {
s = TransactionDB::Open(options, txn_db_options, dbname, &db);
} else {
s = OpenWithStackableDB();
}
return s;
}
Status OpenWithStackableDB() {
std::vector<size_t> compaction_enabled_cf_indices;
std::vector<ColumnFamilyDescriptor> column_families{ColumnFamilyDescriptor(
kDefaultColumnFamilyName, ColumnFamilyOptions(options))};
TransactionDB::PrepareWrap(&options, &column_families,
&compaction_enabled_cf_indices);
std::vector<ColumnFamilyHandle*> handles;
DB* root_db;
Options options_copy(options);
Status s =
DB::Open(options_copy, dbname, column_families, &handles, &root_db);
if (s.ok()) {
assert(handles.size() == 1);
s = TransactionDB::WrapStackableDB(
new StackableDB(root_db), txn_db_options,
compaction_enabled_cf_indices, handles, &db);
delete handles[0];
}
return s;
}
};
class MySQLStyleTransactionTest : public TransactionTest {};
class WritePreparedTransactionTest : public TransactionTest {};
static const TxnDBWritePolicy wc = WRITE_COMMITTED;
static const TxnDBWritePolicy wp = WRITE_PREPARED;
// TODO(myabandeh): Instantiate the tests with other write policies
INSTANTIATE_TEST_CASE_P(DBAsBaseDB, TransactionTest,
::testing::Values(std::make_tuple(false, false, wc)));
::testing::Values(std::make_tuple(false, false,
WRITE_COMMITTED)));
INSTANTIATE_TEST_CASE_P(StackableDBAsBaseDB, TransactionTest,
::testing::Values(std::make_tuple(true, false, wc)));
INSTANTIATE_TEST_CASE_P(MySQLStyleTransactionTest, MySQLStyleTransactionTest,
::testing::Values(std::make_tuple(false, false, wc),
std::make_tuple(false, true, wc),
std::make_tuple(true, false, wc),
std::make_tuple(true, true, wc)));
INSTANTIATE_TEST_CASE_P(WritePreparedTransactionTest,
WritePreparedTransactionTest,
::testing::Values(std::make_tuple(false, true, wp)));
::testing::Values(std::make_tuple(true, false,
WRITE_COMMITTED)));
INSTANTIATE_TEST_CASE_P(
MySQLStyleTransactionTest, MySQLStyleTransactionTest,
::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED),
std::make_tuple(false, true, WRITE_COMMITTED),
std::make_tuple(true, false, WRITE_COMMITTED),
std::make_tuple(true, true, WRITE_COMMITTED)));
TEST_P(TransactionTest, DoubleEmptyWrite) {
WriteOptions write_options;
@ -4733,138 +4640,6 @@ TEST_P(TransactionTest, MemoryLimitTest) {
delete txn;
}
// Test WritePreparedTxnDB's IsInSnapshot against different ordering of
// snapshot, max_committed_seq_, prepared, and commit entries.
TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
WriteOptions wo;
// Use small commit cache to trigger lots of eviction and fast advance of
// max_evicted_seq_
// will take effect after ReOpen
WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = 8;
// Same for snapshot cache size
WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = 5;
// Take some preliminary snapshots first. This is to stress the data structure
// that holds the old snapshots as it will be designed to be efficient when
// only a few snapshots are below the max_evicted_seq_.
for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) {
// Leave some gap between the preliminary snapshots and the final snapshot
// that we check. This should test for also different overlapping scnearios
// between the last snapshot and the commits.
for (int max_gap = 1; max_gap < 10; max_gap++) {
// Since we do not actually write to db, we mock the seq as it would be
// increaased by the db. The only exception is that we need db seq to
// advance for our snapshots. for which we apply a dummy put each time we
// increase our mock of seq.
uint64_t seq = 0;
// At each step we prepare a txn and then we commit it in the next txn.
// This emulates the consecuitive transactions that write to the same key
uint64_t cur_txn = 0;
// Number of snapshots taken so far
int num_snapshots = 0;
std::vector<const Snapshot*> to_be_released;
// Number of gaps applied so far
int gap_cnt = 0;
// The final snapshot that we will inspect
uint64_t snapshot = 0;
bool found_committed = false;
// To stress the data structure that maintain prepared txns, at each cycle
// we add a new prepare txn. These do not mean to be committed for
// snapshot inspection.
std::set<uint64_t> prepared;
// We keep the list of txns comitted before we take the last snaphot.
// These should be the only seq numbers that will be found in the snapshot
std::set<uint64_t> committed_before;
ReOpen(); // to restart the db
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
assert(wp_db);
assert(wp_db->db_impl_);
// We continue until max advances a bit beyond the snapshot.
while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
// do prepare for a transaction
wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq
seq++;
ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq);
wp_db->AddPrepared(seq);
prepared.insert(seq);
// If cur_txn is not started, do prepare for it.
if (!cur_txn) {
wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq
seq++;
ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq);
cur_txn = seq;
wp_db->AddPrepared(cur_txn);
} else { // else commit it
wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq
seq++;
ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq);
wp_db->AddCommitted(cur_txn, seq);
if (!snapshot) {
committed_before.insert(cur_txn);
}
cur_txn = 0;
}
if (num_snapshots < max_snapshots - 1) {
// Take preliminary snapshots
auto tmp_snapshot = db->GetSnapshot();
to_be_released.push_back(tmp_snapshot);
num_snapshots++;
} else if (gap_cnt < max_gap) {
// Wait for some gap before taking the final snapshot
gap_cnt++;
} else if (!snapshot) {
// Take the final snapshot if it is not already taken
auto tmp_snapshot = db->GetSnapshot();
to_be_released.push_back(tmp_snapshot);
snapshot = tmp_snapshot->GetSequenceNumber();
// We increase the db seq artificailly by a dummy Put. Check that this
// technique is effective and db seq is that same as ours.
ASSERT_EQ(snapshot, seq);
num_snapshots++;
}
// If the snapshot is taken, verify seq numbers visible to it. We redo
// it at each cycle to test that the system is still sound when
// max_evicted_seq_ advances.
if (snapshot) {
for (uint64_t s = 0; s <= seq; s++) {
bool was_committed =
(committed_before.find(s) != committed_before.end());
bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
if (was_committed != is_in_snapshot) {
printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64
" snapshot %" PRIu64
" gap_cnt %d num_snapshots %d s %" PRIu64 "\n",
max_snapshots, max_gap, seq,
wp_db->max_evicted_seq_.load(), snapshot, gap_cnt,
num_snapshots, s);
}
ASSERT_EQ(was_committed, is_in_snapshot);
found_committed = found_committed || is_in_snapshot;
}
}
}
// Safety check to make sure the test actually ran
ASSERT_TRUE(found_committed);
// As an extra check, check if prepared set will be properly empty after
// they are committed.
if (cur_txn) {
wp_db->AddCommitted(cur_txn, seq);
}
for (auto p : prepared) {
wp_db->AddCommitted(p, seq);
}
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
ASSERT_TRUE(wp_db->prepared_txns_.empty());
for (auto s : to_be_released) {
db->ReleaseSnapshot(s);
}
}
}
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -0,0 +1,131 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <algorithm>
#include <functional>
#include <string>
#include <thread>
#include "db/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "table/mock_table.h"
#include "util/fault_injection_test_env.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "util/transaction_test_util.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "port/port.h"
namespace rocksdb {
class TransactionTest : public ::testing::TestWithParam<
std::tuple<bool, bool, TxnDBWritePolicy>> {
public:
TransactionDB* db;
FaultInjectionTestEnv* env;
std::string dbname;
Options options;
TransactionDBOptions txn_db_options;
TransactionTest() {
options.create_if_missing = true;
options.max_write_buffer_number = 2;
options.write_buffer_size = 4 * 1024;
options.level0_file_num_compaction_trigger = 2;
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
env = new FaultInjectionTestEnv(Env::Default());
options.env = env;
options.concurrent_prepare = std::get<1>(GetParam());
dbname = test::TmpDir() + "/transaction_testdb";
DestroyDB(dbname, options);
txn_db_options.transaction_lock_timeout = 0;
txn_db_options.default_lock_timeout = 0;
txn_db_options.write_policy = std::get<2>(GetParam());
Status s;
if (std::get<0>(GetParam()) == false) {
s = TransactionDB::Open(options, txn_db_options, dbname, &db);
} else {
s = OpenWithStackableDB();
}
assert(s.ok());
}
~TransactionTest() {
delete db;
DestroyDB(dbname, options);
delete env;
}
Status ReOpenNoDelete() {
delete db;
db = nullptr;
env->AssertNoOpenFile();
env->DropUnsyncedFileData();
env->ResetState();
Status s;
if (std::get<0>(GetParam()) == false) {
s = TransactionDB::Open(options, txn_db_options, dbname, &db);
} else {
s = OpenWithStackableDB();
}
return s;
}
Status ReOpen() {
delete db;
DestroyDB(dbname, options);
Status s;
if (std::get<0>(GetParam()) == false) {
s = TransactionDB::Open(options, txn_db_options, dbname, &db);
} else {
s = OpenWithStackableDB();
}
return s;
}
Status OpenWithStackableDB() {
std::vector<size_t> compaction_enabled_cf_indices;
std::vector<ColumnFamilyDescriptor> column_families{ColumnFamilyDescriptor(
kDefaultColumnFamilyName, ColumnFamilyOptions(options))};
TransactionDB::PrepareWrap(&options, &column_families,
&compaction_enabled_cf_indices);
std::vector<ColumnFamilyHandle*> handles;
DB* root_db;
Options options_copy(options);
Status s =
DB::Open(options_copy, dbname, column_families, &handles, &root_db);
if (s.ok()) {
assert(handles.size() == 1);
s = TransactionDB::WrapStackableDB(
new StackableDB(root_db), txn_db_options,
compaction_enabled_cf_indices, handles, &db);
delete handles[0];
}
return s;
}
};
class MySQLStyleTransactionTest : public TransactionTest {};
} // namespace rocksdb

@ -0,0 +1,569 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include "utilities/transactions/transaction_test.h"
#include <inttypes.h>
#include <algorithm>
#include <functional>
#include <string>
#include <thread>
#include "db/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "table/mock_table.h"
#include "util/fault_injection_test_env.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "util/transaction_test_util.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "port/port.h"
using std::string;
namespace rocksdb {
using CommitEntry = PessimisticTransactionDB::CommitEntry;
TEST(PreparedHeap, BasicsTest) {
WritePreparedTxnDB::PreparedHeap heap;
heap.push(14l);
// Test with one element
ASSERT_EQ(14l, heap.top());
heap.push(24l);
heap.push(34l);
// Test that old min is still on top
ASSERT_EQ(14l, heap.top());
heap.push(13l);
// Test that the new min will be on top
ASSERT_EQ(13l, heap.top());
// Test that it is persistent
ASSERT_EQ(13l, heap.top());
heap.push(44l);
heap.push(54l);
heap.push(64l);
heap.push(74l);
heap.push(84l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
heap.erase(24l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
heap.erase(14l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
heap.erase(13l);
// Test that the new comes to the top after multiple erase
ASSERT_EQ(34l, heap.top());
heap.erase(34l);
// Test that the new comes to the top after single erase
ASSERT_EQ(44l, heap.top());
heap.erase(54l);
ASSERT_EQ(44l, heap.top());
heap.pop(); // pop 44l
// Test that the erased items are ignored after pop
ASSERT_EQ(64l, heap.top());
heap.erase(44l);
// Test that erasing an already popped item would work
ASSERT_EQ(64l, heap.top());
heap.erase(84l);
ASSERT_EQ(64l, heap.top());
heap.push(85l);
heap.push(86l);
heap.push(87l);
heap.push(88l);
heap.push(89l);
heap.erase(87l);
heap.erase(85l);
heap.erase(89l);
heap.erase(86l);
heap.erase(88l);
// Test top remians the same after a ranodm order of many erases
ASSERT_EQ(64l, heap.top());
heap.pop();
// Test that pop works with a series of random pending erases
ASSERT_EQ(74l, heap.top());
ASSERT_FALSE(heap.empty());
heap.pop();
// Test that empty works
ASSERT_TRUE(heap.empty());
}
class WritePreparedTransactionTest : public TransactionTest {
protected:
// If expect_update is set, check if it actually updated old_commit_map_. If
// it did not and yet suggested not to check the next snapshot, do the
// opposite to check if it was not a bad suggstion.
void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare, uint64_t commit,
uint64_t snapshot,
uint64_t next_snapshot,
bool expect_update) {
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// reset old_commit_map_empty_ so that its value indicate whether
// old_commit_map_ was updated
wp_db->old_commit_map_empty_ = true;
bool check_next = wp_db->MaybeUpdateOldCommitMap(prepare, commit, snapshot,
snapshot < next_snapshot);
if (expect_update == wp_db->old_commit_map_empty_) {
printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
" next: %" PRIu64 "\n",
prepare, commit, snapshot, next_snapshot);
}
EXPECT_EQ(!expect_update, wp_db->old_commit_map_empty_);
if (!check_next && wp_db->old_commit_map_empty_) {
// do the oppotisite to make sure it was not a bad suggestion
const bool dont_care_bool = true;
wp_db->MaybeUpdateOldCommitMap(prepare, commit, next_snapshot,
dont_care_bool);
if (!wp_db->old_commit_map_empty_) {
printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
" next: %" PRIu64 "\n",
prepare, commit, snapshot, next_snapshot);
}
EXPECT_TRUE(wp_db->old_commit_map_empty_);
}
}
// Test that a CheckAgainstSnapshots thread reading old_snapshots will not
// miss a snapshot because of a concurrent update by UpdateSnapshots that is
// writing new_snapshots. Both threads are broken at two points. The sync
// points to enforce them are specified by a1, a2, b1, and b2. CommitEntry
// entry is expected to be vital for one of the snapshots that is common
// between the old and new list of snapshots.
void SnapshotConcurrentAccessTestInternal(
WritePreparedTxnDB* wp_db,
const std::vector<SequenceNumber>& old_snapshots,
const std::vector<SequenceNumber>& new_snapshots, CommitEntry& entry,
SequenceNumber& version, size_t a1, size_t a2, size_t b1, size_t b2) {
// First reset the snapshot list
const std::vector<SequenceNumber> empty_snapshots;
wp_db->old_commit_map_empty_ = true;
wp_db->UpdateSnapshots(empty_snapshots, ++version);
// Then initialize it with the old_snapshots
wp_db->UpdateSnapshots(old_snapshots, ++version);
// Starting from the first thread, cut each thread at two points
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1),
"WritePreparedTxnDB::UpdateSnapshots:s:start"},
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1),
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1)},
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2),
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1)},
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2),
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2)},
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:end",
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2)},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
{
ASSERT_TRUE(wp_db->old_commit_map_empty_);
rocksdb::port::Thread t1(
[&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
rocksdb::port::Thread t2([&]() { wp_db->CheckAgainstSnapshots(entry); });
t1.join();
t2.join();
ASSERT_FALSE(wp_db->old_commit_map_empty_);
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
wp_db->old_commit_map_empty_ = true;
wp_db->UpdateSnapshots(empty_snapshots, ++version);
wp_db->UpdateSnapshots(old_snapshots, ++version);
// Starting from the second thread, cut each thread at two points
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1),
"WritePreparedTxnDB::CheckAgainstSnapshots:s:start"},
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1),
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1)},
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2),
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1)},
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2),
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2)},
{"WritePreparedTxnDB::UpdateSnapshots:p:end",
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2)},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
{
ASSERT_TRUE(wp_db->old_commit_map_empty_);
rocksdb::port::Thread t1(
[&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
rocksdb::port::Thread t2([&]() { wp_db->CheckAgainstSnapshots(entry); });
t1.join();
t2.join();
ASSERT_FALSE(wp_db->old_commit_map_empty_);
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
};
INSTANTIATE_TEST_CASE_P(WritePreparedTransactionTest,
WritePreparedTransactionTest,
::testing::Values(std::make_tuple(false, true,
WRITE_PREPARED)));
TEST_P(WritePreparedTransactionTest, CommitMapTest) {
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
assert(wp_db);
assert(wp_db->db_impl_);
size_t size = wp_db->COMMIT_CACHE_SIZE;
CommitEntry c = {5, 12}, e;
bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e);
ASSERT_FALSE(evicted);
// Should be able to read the same value
bool found = wp_db->GetCommitEntry(c.prep_seq % size, &e);
ASSERT_TRUE(found);
ASSERT_EQ(c, e);
// Should be able to distinguish between overlapping entries
found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &e);
ASSERT_TRUE(found);
ASSERT_NE(c.prep_seq + size, e.prep_seq);
// Should be able to detect non-existent entry
found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &e);
ASSERT_EQ(e.commit_seq, 0);
ASSERT_FALSE(found);
// Reject an invalid exchange
CommitEntry e2 = {c.prep_seq + size, c.commit_seq};
bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2, e);
ASSERT_FALSE(exchanged);
// check whether it did actually reject that
found = wp_db->GetCommitEntry(e2.prep_seq % size, &e);
ASSERT_TRUE(found);
ASSERT_EQ(c, e);
// Accept a valid exchange
CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1};
exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c, e3);
ASSERT_TRUE(exchanged);
// check whether it did actually accepted that
found = wp_db->GetCommitEntry(c.prep_seq % size, &e);
ASSERT_TRUE(found);
ASSERT_EQ(e3, e);
// Rewrite an entry
CommitEntry e4 = {e3.prep_seq + size, e3.commit_seq + size + 1};
evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e);
ASSERT_TRUE(evicted);
ASSERT_EQ(e3, e);
found = wp_db->GetCommitEntry(e4.prep_seq % size, &e);
ASSERT_TRUE(found);
ASSERT_EQ(e4, e);
}
TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
// If prepare <= snapshot < commit we should keep the entry around since its
// nonexistence could be interpreted as committed in the snapshot while it is
// not true. We keep such entries around by adding them to the
// old_commit_map_.
uint64_t p /*prepare*/, c /*commit*/, s /*snapshot*/, ns /*next_snapshot*/;
p = 10l, c = 15l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
// If we do not expect the old commit map to be updated, try also with a next
// snapshot that is expected to update the old commit map. This would test
// that MaybeUpdateOldCommitMap would not prevent us from checking the next
// snapshot that must be checked.
p = 10l, c = 15l, s = 20l, ns = 11l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 10l, c = 20l, s = 20l, ns = 19l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 10l, c = 20l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 20l, c = 20l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 20l, c = 20l, s = 20l, ns = 19l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 10l, c = 25l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
p = 20l, c = 25l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
p = 21l, c = 25l, s = 20l, ns = 22l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 21l, c = 25l, s = 20l, ns = 19l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
}
TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l,
500l, 600l, 700l};
// will take effect after ReOpen
WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = snapshots.size() / 2;
ReOpen(); // to restart the db
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
assert(wp_db);
assert(wp_db->db_impl_);
SequenceNumber version = 1000l;
ASSERT_EQ(0, wp_db->snapshots_total_);
wp_db->UpdateSnapshots(snapshots, version);
ASSERT_EQ(snapshots.size(), wp_db->snapshots_total_);
// seq numbers are chosen so that we have two of them between each two
// snapshots. If the diff of two consecuitive seq is more than 5, there is a
// snapshot between them.
std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l,
350l, 355l, 450l, 455l, 550l, 555l,
650l, 655l, 750l, 755l};
assert(seqs.size() > 1);
for (size_t i = 0; i < seqs.size() - 1; i++) {
wp_db->old_commit_map_empty_ = true; // reset
CommitEntry commit_entry = {seqs[i], seqs[i + 1]};
wp_db->CheckAgainstSnapshots(commit_entry);
// Expect update if there is snapshot in between the prepare and commit
bool expect_update = commit_entry.commit_seq - commit_entry.prep_seq > 5 &&
commit_entry.commit_seq >= snapshots.front() &&
commit_entry.prep_seq <= snapshots.back();
ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_);
}
}
// Return true if the ith bit is set in combination represented by comb
bool IsInCombination(size_t i, size_t comb) { return comb & (1 << i); }
// Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in
// parallel with UpdateSnapshots.
TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) {
// We have a sync point in the method under test after checking each snapshot.
// If you increase the max number of snapshots in this test, more sync points
// in the methods must also be added.
const std::vector<SequenceNumber> snapshots = {10l, 20l, 30l, 40l, 50l,
60l, 70l, 80l, 90l, 100l};
SequenceNumber version = 1000l;
// Choose the cache size so that the new snapshot list could replace all the
// existing items in the cache and also have some overflow Will take effect
// after ReOpen
WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = (snapshots.size() - 2) / 2;
ReOpen(); // to restart the db
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
assert(wp_db);
assert(wp_db->db_impl_);
// Add up to 2 items that do not fit into the cache
for (size_t old_size = 1;
old_size <= WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE + 2;
old_size++) {
const std::vector<SequenceNumber> old_snapshots(
snapshots.begin(), snapshots.begin() + old_size);
// Each member of old snapshot might or might not appear in the new list. We
// create a common_snapshots for each combination.
size_t new_comb_cnt = static_cast<size_t>(1 << old_size);
for (size_t new_comb = 0; new_comb < new_comb_cnt; new_comb++) {
std::vector<SequenceNumber> common_snapshots;
for (size_t i = 0; i < old_snapshots.size(); i++) {
if (IsInCombination(i, new_comb)) {
common_snapshots.push_back(old_snapshots[i]);
}
}
// And add some new snapshots to the common list
for (size_t added_snapshots = 0;
added_snapshots <= snapshots.size() - old_snapshots.size();
added_snapshots++) {
std::vector<SequenceNumber> new_snapshots = common_snapshots;
for (size_t i = 0; i < added_snapshots; i++) {
new_snapshots.push_back(snapshots[old_snapshots.size() + i]);
}
for (auto it = common_snapshots.begin(); it != common_snapshots.end();
it++) {
auto snapshot = *it;
// Create a commit entry that is around the snapshot and thus should
// be not be discarded
CommitEntry entry = {static_cast<uint64_t>(snapshot - 1),
snapshot + 1};
// The critical part is when iterating the snapshot cache. Afterwards,
// we are operating under the lock
size_t a_range =
std::min(old_snapshots.size(),
WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE) +
1;
size_t b_range =
std::min(new_snapshots.size(),
WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE) +
1;
// Break each thread at two points
for (size_t a1 = 1; a1 <= a_range; a1++) {
for (size_t a2 = a1 + 1; a2 <= a_range; a2++) {
for (size_t b1 = 1; b1 <= b_range; b1++) {
for (size_t b2 = b1 + 1; b2 <= b_range; b2++) {
SnapshotConcurrentAccessTestInternal(wp_db, old_snapshots,
new_snapshots, entry,
version, a1, a2, b1, b2);
}
}
}
}
}
}
}
}
}
// Test WritePreparedTxnDB's IsInSnapshot against different ordering of
// snapshot, max_committed_seq_, prepared, and commit entries.
TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
WriteOptions wo;
// Use small commit cache to trigger lots of eviction and fast advance of
// max_evicted_seq_
// will take effect after ReOpen
WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = 8;
// Same for snapshot cache size
WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = 5;
// Take some preliminary snapshots first. This is to stress the data structure
// that holds the old snapshots as it will be designed to be efficient when
// only a few snapshots are below the max_evicted_seq_.
for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) {
// Leave some gap between the preliminary snapshots and the final snapshot
// that we check. This should test for also different overlapping scnearios
// between the last snapshot and the commits.
for (int max_gap = 1; max_gap < 10; max_gap++) {
// Since we do not actually write to db, we mock the seq as it would be
// increaased by the db. The only exception is that we need db seq to
// advance for our snapshots. for which we apply a dummy put each time we
// increase our mock of seq.
uint64_t seq = 0;
// At each step we prepare a txn and then we commit it in the next txn.
// This emulates the consecuitive transactions that write to the same key
uint64_t cur_txn = 0;
// Number of snapshots taken so far
int num_snapshots = 0;
std::vector<const Snapshot*> to_be_released;
// Number of gaps applied so far
int gap_cnt = 0;
// The final snapshot that we will inspect
uint64_t snapshot = 0;
bool found_committed = false;
// To stress the data structure that maintain prepared txns, at each cycle
// we add a new prepare txn. These do not mean to be committed for
// snapshot inspection.
std::set<uint64_t> prepared;
// We keep the list of txns comitted before we take the last snaphot.
// These should be the only seq numbers that will be found in the snapshot
std::set<uint64_t> committed_before;
ReOpen(); // to restart the db
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
assert(wp_db);
assert(wp_db->db_impl_);
// We continue until max advances a bit beyond the snapshot.
while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
// do prepare for a transaction
wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq
seq++;
ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq);
wp_db->AddPrepared(seq);
prepared.insert(seq);
// If cur_txn is not started, do prepare for it.
if (!cur_txn) {
wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq
seq++;
ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq);
cur_txn = seq;
wp_db->AddPrepared(cur_txn);
} else { // else commit it
wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq
seq++;
ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq);
wp_db->AddCommitted(cur_txn, seq);
if (!snapshot) {
committed_before.insert(cur_txn);
}
cur_txn = 0;
}
if (num_snapshots < max_snapshots - 1) {
// Take preliminary snapshots
auto tmp_snapshot = db->GetSnapshot();
to_be_released.push_back(tmp_snapshot);
num_snapshots++;
} else if (gap_cnt < max_gap) {
// Wait for some gap before taking the final snapshot
gap_cnt++;
} else if (!snapshot) {
// Take the final snapshot if it is not already taken
auto tmp_snapshot = db->GetSnapshot();
to_be_released.push_back(tmp_snapshot);
snapshot = tmp_snapshot->GetSequenceNumber();
// We increase the db seq artificailly by a dummy Put. Check that this
// technique is effective and db seq is that same as ours.
ASSERT_EQ(snapshot, seq);
num_snapshots++;
}
// If the snapshot is taken, verify seq numbers visible to it. We redo
// it at each cycle to test that the system is still sound when
// max_evicted_seq_ advances.
if (snapshot) {
for (uint64_t s = 0; s <= seq; s++) {
bool was_committed =
(committed_before.find(s) != committed_before.end());
bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
if (was_committed != is_in_snapshot) {
printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64
" snapshot %" PRIu64
" gap_cnt %d num_snapshots %d s %" PRIu64 "\n",
max_snapshots, max_gap, seq,
wp_db->max_evicted_seq_.load(), snapshot, gap_cnt,
num_snapshots, s);
}
ASSERT_EQ(was_committed, is_in_snapshot);
found_committed = found_committed || is_in_snapshot;
}
}
}
// Safety check to make sure the test actually ran
ASSERT_TRUE(found_committed);
// As an extra check, check if prepared set will be properly empty after
// they are committed.
if (cur_txn) {
wp_db->AddCommitted(cur_txn, seq);
}
for (auto p : prepared) {
wp_db->AddCommitted(p, seq);
}
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
ASSERT_TRUE(wp_db->prepared_txns_.empty());
for (auto s : to_be_released) {
db->ReleaseSnapshot(s);
}
}
}
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
#include <stdio.h>
int main(int argc, char** argv) {
fprintf(stderr,
"SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
return 0;
}
#endif // ROCKSDB_LITE
Loading…
Cancel
Save