You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
rocksdb/utilities/transactions/write_prepared_transaction_...

2117 lines
81 KiB

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include "utilities/transactions/transaction_test.h"
#include <inttypes.h>
#include <algorithm>
#include <functional>
#include <string>
#include <thread>
#include "db/db_impl.h"
#include "db/dbformat.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/debug.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "table/mock_table.h"
#include "util/fault_injection_test_env.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "util/transaction_test_util.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/write_prepared_txn_db.h"
#include "port/port.h"
using std::string;
namespace rocksdb {
using CommitEntry = WritePreparedTxnDB::CommitEntry;
using CommitEntry64b = WritePreparedTxnDB::CommitEntry64b;
using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat;
TEST(PreparedHeap, BasicsTest) {
WritePreparedTxnDB::PreparedHeap heap;
heap.push(14l);
// Test with one element
ASSERT_EQ(14l, heap.top());
heap.push(24l);
heap.push(34l);
// Test that old min is still on top
ASSERT_EQ(14l, heap.top());
heap.push(13l);
// Test that the new min will be on top
ASSERT_EQ(13l, heap.top());
// Test that it is persistent
ASSERT_EQ(13l, heap.top());
heap.push(44l);
heap.push(54l);
heap.push(64l);
heap.push(74l);
heap.push(84l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
heap.erase(24l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
heap.erase(14l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
heap.erase(13l);
// Test that the new comes to the top after multiple erase
ASSERT_EQ(34l, heap.top());
heap.erase(34l);
// Test that the new comes to the top after single erase
ASSERT_EQ(44l, heap.top());
heap.erase(54l);
ASSERT_EQ(44l, heap.top());
heap.pop(); // pop 44l
// Test that the erased items are ignored after pop
ASSERT_EQ(64l, heap.top());
heap.erase(44l);
// Test that erasing an already popped item would work
ASSERT_EQ(64l, heap.top());
heap.erase(84l);
ASSERT_EQ(64l, heap.top());
heap.push(85l);
heap.push(86l);
heap.push(87l);
heap.push(88l);
heap.push(89l);
heap.erase(87l);
heap.erase(85l);
heap.erase(89l);
heap.erase(86l);
heap.erase(88l);
// Test top remains the same after a random order of many erases
ASSERT_EQ(64l, heap.top());
heap.pop();
// Test that pop works with a series of random pending erases
ASSERT_EQ(74l, heap.top());
ASSERT_FALSE(heap.empty());
heap.pop();
// Test that empty works
ASSERT_TRUE(heap.empty());
}
// This is a scenario reconstructed from a buggy trace. Test that the bug does
// not resurface again.
TEST(PreparedHeap, EmptyAtTheEnd) {
WritePreparedTxnDB::PreparedHeap heap;
heap.push(40l);
ASSERT_EQ(40l, heap.top());
// Although not a recommended scenario, we must be resilient against erase
// without a prior push.
heap.erase(50l);
ASSERT_EQ(40l, heap.top());
heap.push(60l);
ASSERT_EQ(40l, heap.top());
heap.erase(60l);
ASSERT_EQ(40l, heap.top());
heap.erase(40l);
ASSERT_TRUE(heap.empty());
heap.push(40l);
ASSERT_EQ(40l, heap.top());
heap.erase(50l);
ASSERT_EQ(40l, heap.top());
heap.push(60l);
ASSERT_EQ(40l, heap.top());
heap.erase(40l);
// Test that the erase has not emptied the heap (we had a bug doing that)
ASSERT_FALSE(heap.empty());
ASSERT_EQ(60l, heap.top());
heap.erase(60l);
ASSERT_TRUE(heap.empty());
}
// Generate random order of PreparedHeap access and test that the heap will be
// successfully emptied at the end.
TEST(PreparedHeap, Concurrent) {
const size_t t_cnt = 10;
rocksdb::port::Thread t[t_cnt];
Random rnd(1103);
WritePreparedTxnDB::PreparedHeap heap;
port::RWMutex prepared_mutex;
for (size_t n = 0; n < 100; n++) {
for (size_t i = 0; i < t_cnt; i++) {
// This is not recommended usage but we should be resilient against it.
bool skip_push = rnd.OneIn(5);
t[i] = rocksdb::port::Thread([&heap, &prepared_mutex, skip_push, i]() {
auto seq = i;
std::this_thread::yield();
if (!skip_push) {
WriteLock wl(&prepared_mutex);
heap.push(seq);
}
std::this_thread::yield();
{
WriteLock wl(&prepared_mutex);
heap.erase(seq);
}
});
}
for (size_t i = 0; i < t_cnt; i++) {
t[i].join();
}
ASSERT_TRUE(heap.empty());
}
}
// Test that WriteBatchWithIndex correctly counts the number of sub-batches
TEST(WriteBatchWithIndex, SubBatchCnt) {
ColumnFamilyOptions cf_options;
std::string cf_name = "two";
DB* db;
Options options;
options.create_if_missing = true;
const std::string dbname = test::PerThreadDBPath("transaction_testdb");
DestroyDB(dbname, options);
ASSERT_OK(DB::Open(options, dbname, &db));
ColumnFamilyHandle* cf_handle = nullptr;
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
WriteOptions write_options;
size_t batch_cnt = 1;
size_t save_points = 0;
std::vector<size_t> batch_cnt_at;
WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(), 0, true,
0);
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key"), Slice("value"));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key2"), Slice("value2"));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// duplicate the keys
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key"), Slice("value3"));
batch_cnt++;
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// duplicate the 2nd key. It should not be counted duplicate since a
// sub-patch is cut after the last duplicate.
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key2"), Slice("value4"));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// duplicate the keys but in a different cf. It should not be counted as
// duplicate keys
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(cf_handle, Slice("key"), Slice("value5"));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// Test that the number of sub-batches matches what we count with
// SubBatchCounter
std::map<uint32_t, const Comparator*> comparators;
comparators[0] = db->DefaultColumnFamily()->GetComparator();
comparators[cf_handle->GetID()] = cf_handle->GetComparator();
SubBatchCounter counter(comparators);
ASSERT_OK(batch.GetWriteBatch()->Iterate(&counter));
ASSERT_EQ(batch_cnt, counter.BatchCount());
// Test that RollbackToSavePoint will properly resets the number of
// sub-batches
for (size_t i = save_points; i > 0; i--) {
batch.RollbackToSavePoint();
ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt());
}
// Test the count is right with random batches
{
const size_t TOTAL_KEYS = 20; // 20 ~= 10 to cause a few randoms
Random rnd(1131);
std::string keys[TOTAL_KEYS];
for (size_t k = 0; k < TOTAL_KEYS; k++) {
int len = static_cast<int>(rnd.Uniform(50));
keys[k] = test::RandomKey(&rnd, len);
}
for (size_t i = 0; i < 1000; i++) { // 1000 random batches
WriteBatchWithIndex rndbatch(db->DefaultColumnFamily()->GetComparator(),
0, true, 0);
for (size_t k = 0; k < 10; k++) { // 10 key per batch
size_t ki = static_cast<size_t>(rnd.Uniform(TOTAL_KEYS));
Slice key = Slice(keys[ki]);
std::string buffer;
Slice value = Slice(test::RandomString(&rnd, 16, &buffer));
rndbatch.Put(key, value);
}
SubBatchCounter batch_counter(comparators);
ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter));
ASSERT_EQ(rndbatch.SubBatchCnt(), batch_counter.BatchCount());
}
}
delete cf_handle;
delete db;
}
TEST(CommitEntry64b, BasicTest) {
const size_t INDEX_BITS = static_cast<size_t>(21);
const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS);
const CommitEntry64bFormat FORMAT(static_cast<size_t>(INDEX_BITS));
// zero-initialized CommitEntry64b should indicate an empty entry
CommitEntry64b empty_entry64b;
uint64_t empty_index = 11ul;
CommitEntry empty_entry;
bool ok = empty_entry64b.Parse(empty_index, &empty_entry, FORMAT);
ASSERT_FALSE(ok);
// the zero entry is reserved for un-initialized entries
const size_t MAX_COMMIT = (1 << FORMAT.COMMIT_BITS) - 1 - 1;
// Samples over the numbers that are covered by that many index bits
std::array<uint64_t, 4> is = {{0, 1, INDEX_SIZE / 2 + 1, INDEX_SIZE - 1}};
// Samples over the numbers that are covered by that many commit bits
std::array<uint64_t, 4> ds = {{0, 1, MAX_COMMIT / 2 + 1, MAX_COMMIT}};
// Iterate over prepare numbers that have i) cover all bits of a sequence
// number, and ii) include some bits that fall into the range of index or
// commit bits
for (uint64_t base = 1; base < kMaxSequenceNumber; base *= 2) {
for (uint64_t i : is) {
for (uint64_t d : ds) {
uint64_t p = base + i + d;
for (uint64_t c : {p, p + d / 2, p + d}) {
uint64_t index = p % INDEX_SIZE;
CommitEntry before(p, c), after;
CommitEntry64b entry64b(before, FORMAT);
ok = entry64b.Parse(index, &after, FORMAT);
ASSERT_TRUE(ok);
if (!(before == after)) {
printf("base %" PRIu64 " i %" PRIu64 " d %" PRIu64 " p %" PRIu64
" c %" PRIu64 " index %" PRIu64 "\n",
base, i, d, p, c, index);
}
ASSERT_EQ(before, after);
}
}
}
}
}
class WritePreparedTxnDBMock : public WritePreparedTxnDB {
public:
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt)
: WritePreparedTxnDB(db_impl, opt) {}
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt,
size_t snapshot_cache_size)
: WritePreparedTxnDB(db_impl, opt, snapshot_cache_size) {}
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt,
size_t snapshot_cache_size, size_t commit_cache_size)
: WritePreparedTxnDB(db_impl, opt, snapshot_cache_size,
commit_cache_size) {}
void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) {
snapshots_ = snapshots;
}
void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); }
protected:
virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
SequenceNumber /* unused */) override {
return snapshots_;
}
private:
std::vector<SequenceNumber> snapshots_;
};
class WritePreparedTransactionTestBase : public TransactionTestBase {
public:
WritePreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue,
TxnDBWritePolicy write_policy)
: TransactionTestBase(use_stackable_db, two_write_queue, write_policy){};
protected:
// If expect_update is set, check if it actually updated old_commit_map_. If
// it did not and yet suggested not to check the next snapshot, do the
// opposite to check if it was not a bad suggestion.
void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare, uint64_t commit,
uint64_t snapshot,
uint64_t next_snapshot,
bool expect_update) {
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// reset old_commit_map_empty_ so that its value indicate whether
// old_commit_map_ was updated
wp_db->old_commit_map_empty_ = true;
bool check_next = wp_db->MaybeUpdateOldCommitMap(prepare, commit, snapshot,
snapshot < next_snapshot);
if (expect_update == wp_db->old_commit_map_empty_) {
printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
" next: %" PRIu64 "\n",
prepare, commit, snapshot, next_snapshot);
}
EXPECT_EQ(!expect_update, wp_db->old_commit_map_empty_);
if (!check_next && wp_db->old_commit_map_empty_) {
// do the opposite to make sure it was not a bad suggestion
const bool dont_care_bool = true;
wp_db->MaybeUpdateOldCommitMap(prepare, commit, next_snapshot,
dont_care_bool);
if (!wp_db->old_commit_map_empty_) {
printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
" next: %" PRIu64 "\n",
prepare, commit, snapshot, next_snapshot);
}
EXPECT_TRUE(wp_db->old_commit_map_empty_);
}
}
// Test that a CheckAgainstSnapshots thread reading old_snapshots will not
// miss a snapshot because of a concurrent update by UpdateSnapshots that is
// writing new_snapshots. Both threads are broken at two points. The sync
// points to enforce them are specified by a1, a2, b1, and b2. CommitEntry
// entry is expected to be vital for one of the snapshots that is common
// between the old and new list of snapshots.
void SnapshotConcurrentAccessTestInternal(
WritePreparedTxnDB* wp_db,
const std::vector<SequenceNumber>& old_snapshots,
const std::vector<SequenceNumber>& new_snapshots, CommitEntry& entry,
SequenceNumber& version, size_t a1, size_t a2, size_t b1, size_t b2) {
// First reset the snapshot list
const std::vector<SequenceNumber> empty_snapshots;
wp_db->old_commit_map_empty_ = true;
wp_db->UpdateSnapshots(empty_snapshots, ++version);
// Then initialize it with the old_snapshots
wp_db->UpdateSnapshots(old_snapshots, ++version);
// Starting from the first thread, cut each thread at two points
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1),
"WritePreparedTxnDB::UpdateSnapshots:s:start"},
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1),
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1)},
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2),
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1)},
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2),
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2)},
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:end",
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2)},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
{
ASSERT_TRUE(wp_db->old_commit_map_empty_);
rocksdb::port::Thread t1(
[&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
rocksdb::port::Thread t2([&]() { wp_db->CheckAgainstSnapshots(entry); });
t1.join();
t2.join();
ASSERT_FALSE(wp_db->old_commit_map_empty_);
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
wp_db->old_commit_map_empty_ = true;
wp_db->UpdateSnapshots(empty_snapshots, ++version);
wp_db->UpdateSnapshots(old_snapshots, ++version);
// Starting from the second thread, cut each thread at two points
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1),
"WritePreparedTxnDB::CheckAgainstSnapshots:s:start"},
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1),
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1)},
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2),
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1)},
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2),
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2)},
{"WritePreparedTxnDB::UpdateSnapshots:p:end",
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2)},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
{
ASSERT_TRUE(wp_db->old_commit_map_empty_);
rocksdb::port::Thread t1(
[&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
rocksdb::port::Thread t2([&]() { wp_db->CheckAgainstSnapshots(entry); });
t1.join();
t2.join();
ASSERT_FALSE(wp_db->old_commit_map_empty_);
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
// Verify value of keys.
void VerifyKeys(const std::unordered_map<std::string, std::string>& data,
const Snapshot* snapshot = nullptr) {
std::string value;
ReadOptions read_options;
read_options.snapshot = snapshot;
for (auto& kv : data) {
auto s = db->Get(read_options, kv.first, &value);
ASSERT_TRUE(s.ok() || s.IsNotFound());
if (s.ok()) {
if (kv.second != value) {
printf("key = %s\n", kv.first.c_str());
}
ASSERT_EQ(kv.second, value);
} else {
ASSERT_EQ(kv.second, "NOT_FOUND");
}
// Try with MultiGet API too
std::vector<std::string> values;
auto s_vec = db->MultiGet(read_options, {db->DefaultColumnFamily()},
{kv.first}, &values);
ASSERT_EQ(1, values.size());
ASSERT_EQ(1, s_vec.size());
s = s_vec[0];
ASSERT_TRUE(s.ok() || s.IsNotFound());
if (s.ok()) {
ASSERT_TRUE(kv.second == values[0]);
} else {
ASSERT_EQ(kv.second, "NOT_FOUND");
}
}
}
// Verify all versions of keys.
void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) {
std::vector<KeyVersion> versions;
const size_t kMaxKeys = 100000;
ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key,
expected_versions.back().user_key, kMaxKeys,
&versions));
ASSERT_EQ(expected_versions.size(), versions.size());
for (size_t i = 0; i < versions.size(); i++) {
ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key);
ASSERT_EQ(expected_versions[i].sequence, versions[i].sequence);
ASSERT_EQ(expected_versions[i].type, versions[i].type);
if (versions[i].type != kTypeDeletion &&
versions[i].type != kTypeSingleDeletion) {
ASSERT_EQ(expected_versions[i].value, versions[i].value);
}
// Range delete not supported.
assert(expected_versions[i].type != kTypeRangeDeletion);
}
}
};
class WritePreparedTransactionTest
: public WritePreparedTransactionTestBase,
virtual public ::testing::WithParamInterface<
std::tuple<bool, bool, TxnDBWritePolicy>> {
public:
WritePreparedTransactionTest()
: WritePreparedTransactionTestBase(std::get<0>(GetParam()),
std::get<1>(GetParam()),
std::get<2>(GetParam())){};
};
#ifndef ROCKSDB_VALGRIND_RUN
class SnapshotConcurrentAccessTest
: public WritePreparedTransactionTestBase,
virtual public ::testing::WithParamInterface<
std::tuple<bool, bool, TxnDBWritePolicy, size_t, size_t>> {
public:
SnapshotConcurrentAccessTest()
: WritePreparedTransactionTestBase(std::get<0>(GetParam()),
std::get<1>(GetParam()),
std::get<2>(GetParam())),
split_id_(std::get<3>(GetParam())),
split_cnt_(std::get<4>(GetParam())){};
protected:
// A test is split into split_cnt_ tests, each identified with split_id_ where
// 0 <= split_id_ < split_cnt_
size_t split_id_;
size_t split_cnt_;
};
#endif // ROCKSDB_VALGRIND_RUN
class SeqAdvanceConcurrentTest
: public WritePreparedTransactionTestBase,
virtual public ::testing::WithParamInterface<
std::tuple<bool, bool, TxnDBWritePolicy, size_t, size_t>> {
public:
SeqAdvanceConcurrentTest()
: WritePreparedTransactionTestBase(std::get<0>(GetParam()),
std::get<1>(GetParam()),
std::get<2>(GetParam())),
split_id_(std::get<3>(GetParam())),
split_cnt_(std::get<4>(GetParam())){};
protected:
// A test is split into split_cnt_ tests, each identified with split_id_ where
// 0 <= split_id_ < split_cnt_
size_t split_id_;
size_t split_cnt_;
};
INSTANTIATE_TEST_CASE_P(
WritePreparedTransactionTest, WritePreparedTransactionTest,
::testing::Values(std::make_tuple(false, false, WRITE_PREPARED),
std::make_tuple(false, true, WRITE_PREPARED)));
#ifndef ROCKSDB_VALGRIND_RUN
INSTANTIATE_TEST_CASE_P(
TwoWriteQueues, SnapshotConcurrentAccessTest,
::testing::Values(std::make_tuple(false, true, WRITE_PREPARED, 0, 20),
std::make_tuple(false, true, WRITE_PREPARED, 1, 20),
std::make_tuple(false, true, WRITE_PREPARED, 2, 20),
std::make_tuple(false, true, WRITE_PREPARED, 3, 20),
std::make_tuple(false, true, WRITE_PREPARED, 4, 20),
std::make_tuple(false, true, WRITE_PREPARED, 5, 20),
std::make_tuple(false, true, WRITE_PREPARED, 6, 20),
std::make_tuple(false, true, WRITE_PREPARED, 7, 20),
std::make_tuple(false, true, WRITE_PREPARED, 8, 20),
std::make_tuple(false, true, WRITE_PREPARED, 9, 20),
std::make_tuple(false, true, WRITE_PREPARED, 10, 20),
std::make_tuple(false, true, WRITE_PREPARED, 11, 20),
std::make_tuple(false, true, WRITE_PREPARED, 12, 20),
std::make_tuple(false, true, WRITE_PREPARED, 13, 20),
std::make_tuple(false, true, WRITE_PREPARED, 14, 20),
std::make_tuple(false, true, WRITE_PREPARED, 15, 20),
std::make_tuple(false, true, WRITE_PREPARED, 16, 20),
std::make_tuple(false, true, WRITE_PREPARED, 17, 20),
std::make_tuple(false, true, WRITE_PREPARED, 18, 20),
std::make_tuple(false, true, WRITE_PREPARED, 19, 20)));
INSTANTIATE_TEST_CASE_P(
OneWriteQueue, SnapshotConcurrentAccessTest,
::testing::Values(std::make_tuple(false, false, WRITE_PREPARED, 0, 20),
std::make_tuple(false, false, WRITE_PREPARED, 1, 20),
std::make_tuple(false, false, WRITE_PREPARED, 2, 20),
std::make_tuple(false, false, WRITE_PREPARED, 3, 20),
std::make_tuple(false, false, WRITE_PREPARED, 4, 20),
std::make_tuple(false, false, WRITE_PREPARED, 5, 20),
std::make_tuple(false, false, WRITE_PREPARED, 6, 20),
std::make_tuple(false, false, WRITE_PREPARED, 7, 20),
std::make_tuple(false, false, WRITE_PREPARED, 8, 20),
std::make_tuple(false, false, WRITE_PREPARED, 9, 20),
std::make_tuple(false, false, WRITE_PREPARED, 10, 20),
std::make_tuple(false, false, WRITE_PREPARED, 11, 20),
std::make_tuple(false, false, WRITE_PREPARED, 12, 20),
std::make_tuple(false, false, WRITE_PREPARED, 13, 20),
std::make_tuple(false, false, WRITE_PREPARED, 14, 20),
std::make_tuple(false, false, WRITE_PREPARED, 15, 20),
std::make_tuple(false, false, WRITE_PREPARED, 16, 20),
std::make_tuple(false, false, WRITE_PREPARED, 17, 20),
std::make_tuple(false, false, WRITE_PREPARED, 18, 20),
std::make_tuple(false, false, WRITE_PREPARED, 19, 20)));
INSTANTIATE_TEST_CASE_P(
TwoWriteQueues, SeqAdvanceConcurrentTest,
::testing::Values(std::make_tuple(false, true, WRITE_PREPARED, 0, 10),
std::make_tuple(false, true, WRITE_PREPARED, 1, 10),
std::make_tuple(false, true, WRITE_PREPARED, 2, 10),
std::make_tuple(false, true, WRITE_PREPARED, 3, 10),
std::make_tuple(false, true, WRITE_PREPARED, 4, 10),
std::make_tuple(false, true, WRITE_PREPARED, 5, 10),
std::make_tuple(false, true, WRITE_PREPARED, 6, 10),
std::make_tuple(false, true, WRITE_PREPARED, 7, 10),
std::make_tuple(false, true, WRITE_PREPARED, 8, 10),
std::make_tuple(false, true, WRITE_PREPARED, 9, 10)));
INSTANTIATE_TEST_CASE_P(
OneWriteQueue, SeqAdvanceConcurrentTest,
::testing::Values(std::make_tuple(false, false, WRITE_PREPARED, 0, 10),
std::make_tuple(false, false, WRITE_PREPARED, 1, 10),
std::make_tuple(false, false, WRITE_PREPARED, 2, 10),
std::make_tuple(false, false, WRITE_PREPARED, 3, 10),
std::make_tuple(false, false, WRITE_PREPARED, 4, 10),
std::make_tuple(false, false, WRITE_PREPARED, 5, 10),
std::make_tuple(false, false, WRITE_PREPARED, 6, 10),
std::make_tuple(false, false, WRITE_PREPARED, 7, 10),
std::make_tuple(false, false, WRITE_PREPARED, 8, 10),
std::make_tuple(false, false, WRITE_PREPARED, 9, 10)));
#endif // ROCKSDB_VALGRIND_RUN
TEST_P(WritePreparedTransactionTest, CommitMapTest) {
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
assert(wp_db);
assert(wp_db->db_impl_);
size_t size = wp_db->COMMIT_CACHE_SIZE;
CommitEntry c = {5, 12}, e;
bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e);
ASSERT_FALSE(evicted);
// Should be able to read the same value
CommitEntry64b dont_care;
bool found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_EQ(c, e);
// Should be able to distinguish between overlapping entries
found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_NE(c.prep_seq + size, e.prep_seq);
// Should be able to detect non-existent entry
found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &dont_care, &e);
ASSERT_FALSE(found);
// Reject an invalid exchange
CommitEntry e2 = {c.prep_seq + size, c.commit_seq + size};
CommitEntry64b e2_64b(e2, wp_db->FORMAT);
bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2_64b, e);
ASSERT_FALSE(exchanged);
// check whether it did actually reject that
found = wp_db->GetCommitEntry(e2.prep_seq % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_EQ(c, e);
// Accept a valid exchange
CommitEntry64b c_64b(c, wp_db->FORMAT);
CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1};
exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c_64b, e3);
ASSERT_TRUE(exchanged);
// check whether it did actually accepted that
found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_EQ(e3, e);
// Rewrite an entry
CommitEntry e4 = {e3.prep_seq + size, e3.commit_seq + size + 1};
evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e);
ASSERT_TRUE(evicted);
ASSERT_EQ(e3, e);
found = wp_db->GetCommitEntry(e4.prep_seq % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_EQ(e4, e);
}
TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
// If prepare <= snapshot < commit we should keep the entry around since its
// nonexistence could be interpreted as committed in the snapshot while it is
// not true. We keep such entries around by adding them to the
// old_commit_map_.
uint64_t p /*prepare*/, c /*commit*/, s /*snapshot*/, ns /*next_snapshot*/;
p = 10l, c = 15l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
// If we do not expect the old commit map to be updated, try also with a next
// snapshot that is expected to update the old commit map. This would test
// that MaybeUpdateOldCommitMap would not prevent us from checking the next
// snapshot that must be checked.
p = 10l, c = 15l, s = 20l, ns = 11l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 10l, c = 20l, s = 20l, ns = 19l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 10l, c = 20l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 20l, c = 20l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 20l, c = 20l, s = 20l, ns = 19l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 10l, c = 25l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
p = 20l, c = 25l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
p = 21l, c = 25l, s = 20l, ns = 22l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 21l, c = 25l, s = 20l, ns = 19l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
}
// Test that the entries in old_commit_map_ get garbage collected properly
TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
const size_t snapshot_cache_bits = 0;
const size_t commit_cache_bits = 0;
DBImpl* mock_db = new DBImpl(options, dbname);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits));
SequenceNumber seq = 0;
// Take the first snapshot that overlaps with two txn
auto prep_seq = ++seq;
wp_db->AddPrepared(prep_seq);
auto prep_seq2 = ++seq;
wp_db->AddPrepared(prep_seq2);
auto snap_seq1 = seq;
wp_db->TakeSnapshot(snap_seq1);
auto commit_seq = ++seq;
wp_db->AddCommitted(prep_seq, commit_seq);
wp_db->RemovePrepared(prep_seq);
auto commit_seq2 = ++seq;
wp_db->AddCommitted(prep_seq2, commit_seq2);
wp_db->RemovePrepared(prep_seq2);
// Take the 2nd and 3rd snapshot that overlap with the same txn
prep_seq = ++seq;
wp_db->AddPrepared(prep_seq);
auto snap_seq2 = seq;
wp_db->TakeSnapshot(snap_seq2);
seq++;
auto snap_seq3 = seq;
wp_db->TakeSnapshot(snap_seq3);
seq++;
commit_seq = ++seq;
wp_db->AddCommitted(prep_seq, commit_seq);
wp_db->RemovePrepared(prep_seq);
// Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the
// only item in the commit_cache_ via another commit.
prep_seq = ++seq;
wp_db->AddPrepared(prep_seq);
commit_seq = ++seq;
wp_db->AddCommitted(prep_seq, commit_seq);
wp_db->RemovePrepared(prep_seq);
// Verify that the evicted commit entries for all snapshots are in the
// old_commit_map_
{
ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
ReadLock rl(&wp_db->old_commit_map_mutex_);
ASSERT_EQ(3, wp_db->old_commit_map_.size());
ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size());
ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq2].size());
ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
}
// Verify that the 2nd snapshot is cleaned up after the release
wp_db->ReleaseSnapshotInternal(snap_seq2);
{
ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
ReadLock rl(&wp_db->old_commit_map_mutex_);
ASSERT_EQ(2, wp_db->old_commit_map_.size());
ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size());
ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
}
// Verify that the 1st snapshot is cleaned up after the release
wp_db->ReleaseSnapshotInternal(snap_seq1);
{
ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
ReadLock rl(&wp_db->old_commit_map_mutex_);
ASSERT_EQ(1, wp_db->old_commit_map_.size());
ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
}
// Verify that the 3rd snapshot is cleaned up after the release
wp_db->ReleaseSnapshotInternal(snap_seq3);
{
ASSERT_TRUE(wp_db->old_commit_map_empty_.load());
ReadLock rl(&wp_db->old_commit_map_mutex_);
ASSERT_EQ(0, wp_db->old_commit_map_.size());
}
}
TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l,
600l, 700l, 800l, 900l};
const size_t snapshot_cache_bits = 2;
// Safety check to express the intended size in the test. Can be adjusted if
// the snapshots lists changed.
assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size());
DBImpl* mock_db = new DBImpl(options, dbname);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits));
SequenceNumber version = 1000l;
ASSERT_EQ(0, wp_db->snapshots_total_);
wp_db->UpdateSnapshots(snapshots, version);
ASSERT_EQ(snapshots.size(), wp_db->snapshots_total_);
// seq numbers are chosen so that we have two of them between each two
// snapshots. If the diff of two consecutive seq is more than 5, there is a
// snapshot between them.
std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l, 350l,
355l, 450l, 455l, 550l, 555l, 650l, 655l,
750l, 755l, 850l, 855l, 950l, 955l};
assert(seqs.size() > 1);
for (size_t i = 0; i < seqs.size() - 1; i++) {
wp_db->old_commit_map_empty_ = true; // reset
CommitEntry commit_entry = {seqs[i], seqs[i + 1]};
wp_db->CheckAgainstSnapshots(commit_entry);
// Expect update if there is snapshot in between the prepare and commit
bool expect_update = commit_entry.commit_seq - commit_entry.prep_seq > 5 &&
commit_entry.commit_seq >= snapshots.front() &&
commit_entry.prep_seq <= snapshots.back();
ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_);
}
}
// This test is too slow for travis
#ifndef TRAVIS
#ifndef ROCKSDB_VALGRIND_RUN
// Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in
// parallel with UpdateSnapshots.
TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccessTest) {
// We have a sync point in the method under test after checking each snapshot.
// If you increase the max number of snapshots in this test, more sync points
// in the methods must also be added.
const std::vector<SequenceNumber> snapshots = {10l, 20l, 30l, 40l, 50l,
60l, 70l, 80l, 90l, 100l};
const size_t snapshot_cache_bits = 2;
// Safety check to express the intended size in the test. Can be adjusted if
// the snapshots lists changed.
assert((1ul << snapshot_cache_bits) * 2 + 2 == snapshots.size());
SequenceNumber version = 1000l;
// Choose the cache size so that the new snapshot list could replace all the
// existing items in the cache and also have some overflow.
DBImpl* mock_db = new DBImpl(options, dbname);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits));
const size_t extra = 2;
size_t loop_id = 0;
// Add up to extra items that do not fit into the cache
for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + extra;
old_size++) {
const std::vector<SequenceNumber> old_snapshots(
snapshots.begin(), snapshots.begin() + old_size);
// Each member of old snapshot might or might not appear in the new list. We
// create a common_snapshots for each combination.
size_t new_comb_cnt = size_t(1) << old_size;
for (size_t new_comb = 0; new_comb < new_comb_cnt; new_comb++, loop_id++) {
if (loop_id % split_cnt_ != split_id_) continue;
printf("."); // To signal progress
fflush(stdout);
std::vector<SequenceNumber> common_snapshots;
for (size_t i = 0; i < old_snapshots.size(); i++) {
if (IsInCombination(i, new_comb)) {
common_snapshots.push_back(old_snapshots[i]);
}
}
// And add some new snapshots to the common list
for (size_t added_snapshots = 0;
added_snapshots <= snapshots.size() - old_snapshots.size();
added_snapshots++) {
std::vector<SequenceNumber> new_snapshots = common_snapshots;
for (size_t i = 0; i < added_snapshots; i++) {
new_snapshots.push_back(snapshots[old_snapshots.size() + i]);
}
for (auto it = common_snapshots.begin(); it != common_snapshots.end();
it++) {
auto snapshot = *it;
// Create a commit entry that is around the snapshot and thus should
// be not be discarded
CommitEntry entry = {static_cast<uint64_t>(snapshot - 1),
snapshot + 1};
// The critical part is when iterating the snapshot cache. Afterwards,
// we are operating under the lock
size_t a_range =
std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
size_t b_range =
std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
// Break each thread at two points
for (size_t a1 = 1; a1 <= a_range; a1++) {
for (size_t a2 = a1 + 1; a2 <= a_range; a2++) {
for (size_t b1 = 1; b1 <= b_range; b1++) {
for (size_t b2 = b1 + 1; b2 <= b_range; b2++) {
SnapshotConcurrentAccessTestInternal(
wp_db.get(), old_snapshots, new_snapshots, entry, version,
a1, a2, b1, b2);
}
}
}
}
}
}
}
}
printf("\n");
}
#endif // ROCKSDB_VALGRIND_RUN
#endif // TRAVIS
// This test clarifies the contract of AdvanceMaxEvictedSeq method
TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) {
DBImpl* mock_db = new DBImpl(options, dbname);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options));
// 1. Set the initial values for max, prepared, and snapshots
SequenceNumber zero_max = 0l;
// Set the initial list of prepared txns
const std::vector<SequenceNumber> initial_prepared = {10, 30, 50, 100,
150, 200, 250};
for (auto p : initial_prepared) {
wp_db->AddPrepared(p);
}
// This updates the max value and also set old prepared
SequenceNumber init_max = 100;
wp_db->AdvanceMaxEvictedSeq(zero_max, init_max);
const std::vector<SequenceNumber> initial_snapshots = {20, 40};
wp_db->SetDBSnapshots(initial_snapshots);
// This will update the internal cache of snapshots from the DB
wp_db->UpdateSnapshots(initial_snapshots, init_max);
// 2. Invoke AdvanceMaxEvictedSeq
const std::vector<SequenceNumber> latest_snapshots = {20, 110, 220, 300};
wp_db->SetDBSnapshots(latest_snapshots);
SequenceNumber new_max = 200;
wp_db->AdvanceMaxEvictedSeq(init_max, new_max);
// 3. Verify that the state matches with AdvanceMaxEvictedSeq contract
// a. max should be updated to new_max
ASSERT_EQ(wp_db->max_evicted_seq_, new_max);
// b. delayed prepared should contain every txn <= max and prepared should
// only contain txns > max
auto it = initial_prepared.begin();
for (; it != initial_prepared.end() && *it <= new_max; it++) {
ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it));
}
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty();
it++, wp_db->prepared_txns_.pop()) {
ASSERT_EQ(*it, wp_db->prepared_txns_.top());
}
ASSERT_TRUE(it == initial_prepared.end());
ASSERT_TRUE(wp_db->prepared_txns_.empty());
// c. snapshots should contain everything below new_max
auto sit = latest_snapshots.begin();
for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max &&
i < wp_db->snapshots_total_;
sit++, i++) {
ASSERT_TRUE(i < wp_db->snapshots_total_);
// This test is in small scale and the list of snapshots are assumed to be
// within the cache size limit. This is just a safety check to double check
// that assumption.
ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE);
ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]);
}
}
// This tests that transactions with duplicate keys perform correctly after max
// is advancing their prepared sequence numbers. This will not be the case if
// for example the txn does not add the prepared seq for the second sub-batch to
// the PrepareHeap structure.
TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) {
WriteOptions write_options;
TransactionOptions txn_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn0->SetName("xid"));
ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
ASSERT_OK(txn0->Put(Slice("key"), Slice("value2")));
ASSERT_OK(txn0->Prepare());
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// Ensure that all the prepared sequence numbers will be removed from the
// PrepareHeap.
SequenceNumber new_max = wp_db->COMMIT_CACHE_SIZE;
wp_db->AdvanceMaxEvictedSeq(0, new_max);
ReadOptions ropt;
PinnableSlice pinnable_val;
auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
delete txn0;
wp_db->db_impl_->FlushWAL(true);
wp_db->TEST_Crash();
ReOpenNoDelete();
assert(db != nullptr);
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
wp_db->AdvanceMaxEvictedSeq(0, new_max);
s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
txn0 = db->GetTransactionByName("xid");
ASSERT_OK(txn0->Rollback());
delete txn0;
}
TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrentTest) {
// Given the sequential run of txns, with this timeout we should never see a
// deadlock nor a timeout unless we have a key conflict, which should be
// almost infeasible.
txn_db_options.transaction_lock_timeout = 1000;
txn_db_options.default_lock_timeout = 1000;
ReOpen();
FlushOptions fopt;
// Number of different txn types we use in this test
const size_t type_cnt = 5;
// The size of the first write group
// TODO(myabandeh): This should be increase for pre-release tests
const size_t first_group_size = 2;
// Total number of txns we run in each test
// TODO(myabandeh): This should be increase for pre-release tests
const size_t txn_cnt = first_group_size + 1;
size_t base[txn_cnt + 1] = {
1,
};
for (size_t bi = 1; bi <= txn_cnt; bi++) {
base[bi] = base[bi - 1] * type_cnt;
}
const size_t max_n = static_cast<size_t>(std::pow(type_cnt, txn_cnt));
printf("Number of cases being tested is %" ROCKSDB_PRIszt "\n", max_n);
for (size_t n = 0; n < max_n; n++, ReOpen()) {
if (n % split_cnt_ != split_id_) continue;
if (n % 1000 == 0) {
printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n);
}
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
auto seq = db_impl->TEST_GetLastVisibleSequence();
exp_seq = seq;
// This is increased before writing the batch for commit
commit_writes = 0;
// This is increased before txn starts linking if it expects to do a commit
// eventually
expected_commits = 0;
std::vector<port::Thread> threads;
linked = 0;
std::atomic<bool> batch_formed(false);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::EnterAsBatchGroupLeader:End",
[&](void* /*arg*/) { batch_formed = true; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) {
linked++;
if (linked == 1) {
// Wait until the others are linked too.
while (linked < first_group_size) {
}
} else if (linked == 1 + first_group_size) {
// Make the 2nd batch of the rest of writes plus any followup
// commits from the first batch
while (linked < txn_cnt + commit_writes) {
}
}
// Then we will have one or more batches consisting of follow-up
// commits from the 2nd batch. There is a bit of non-determinism here
// but it should be tolerable.
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (size_t bi = 0; bi < txn_cnt; bi++) {
// get the bi-th digit in number system based on type_cnt
size_t d = (n % base[bi + 1]) / base[bi];
switch (d) {
case 0:
threads.emplace_back(txn_t0, bi);
break;
case 1:
threads.emplace_back(txn_t1, bi);
break;
case 2:
threads.emplace_back(txn_t2, bi);
break;
case 3:
threads.emplace_back(txn_t3, bi);
break;
case 4:
threads.emplace_back(txn_t3, bi);
break;
default:
assert(false);
}
// wait to be linked
while (linked.load() <= bi) {
}
// after a queue of size first_group_size
if (bi + 1 == first_group_size) {
while (!batch_formed) {
}
// to make it more deterministic, wait until the commits are linked
while (linked.load() <= bi + expected_commits) {
}
}
}
for (auto& t : threads) {
t.join();
}
if (options.two_write_queues) {
// In this case none of the above scheduling tricks to deterministically
// form merged batches works because the writes go to separate queues.
// This would result in different write groups in each run of the test. We
// still keep the test since although non-deterministic and hard to debug,
// it is still useful to have.
// TODO(myabandeh): Add a deterministic unit test for two_write_queues
}
// Check if memtable inserts advanced seq number as expected
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
// Check if recovery preserves the last sequence number
db_impl->FlushWAL(true);
ReOpenNoDelete();
assert(db != nullptr);
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
// Check if flush preserves the last sequence number
db_impl->Flush(fopt);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
// Check if recovery after flush preserves the last sequence number
db_impl->FlushWAL(true);
ReOpenNoDelete();
assert(db != nullptr);
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
}
}
// Run a couple of different txns among them some uncommitted. Restart the db at
// a couple points to check whether the list of uncommitted txns are recovered
// properly.
TEST_P(WritePreparedTransactionTest, BasicRecoveryTest) {
options.disable_auto_compactions = true;
ReOpen();
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
txn_t0(0);
TransactionOptions txn_options;
WriteOptions write_options;
size_t index = 1000;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
auto istr0 = std::to_string(index);
auto s = txn0->SetName("xid" + istr0);
ASSERT_OK(s);
s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0));
ASSERT_OK(s);
s = txn0->Prepare();
auto prep_seq_0 = txn0->GetId();
txn_t1(0);
index++;
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
auto istr1 = std::to_string(index);
s = txn1->SetName("xid" + istr1);
ASSERT_OK(s);
s = txn1->Put(Slice("foo1" + istr1), Slice("bar"));
ASSERT_OK(s);
s = txn1->Prepare();
auto prep_seq_1 = txn1->GetId();
txn_t2(0);
ReadOptions ropt;
PinnableSlice pinnable_val;
// Check the value is not committed before restart
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
pinnable_val.Reset();
delete txn0;
delete txn1;
wp_db->db_impl_->FlushWAL(true);
wp_db->TEST_Crash();
ReOpenNoDelete();
assert(db != nullptr);
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// After recovery, all the uncommitted txns (0 and 1) should be inserted into
// delayed_prepared_
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_FALSE(wp_db->delayed_prepared_empty_);
ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
ASSERT_LE(prep_seq_1, wp_db->max_evicted_seq_);
{
ReadLock rl(&wp_db->prepared_mutex_);
ASSERT_EQ(2, wp_db->delayed_prepared_.size());
ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_0) !=
wp_db->delayed_prepared_.end());
ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_1) !=
wp_db->delayed_prepared_.end());
}
// Check the value is still not committed after restart
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
pinnable_val.Reset();
txn_t3(0);
// Test that a recovered txns will be properly marked committed for the next
// recovery
txn1 = db->GetTransactionByName("xid" + istr1);
ASSERT_NE(txn1, nullptr);
txn1->Commit();
delete txn1;
index++;
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
auto istr2 = std::to_string(index);
s = txn2->SetName("xid" + istr2);
ASSERT_OK(s);
s = txn2->Put(Slice("foo2" + istr2), Slice("bar"));
ASSERT_OK(s);
s = txn2->Prepare();
auto prep_seq_2 = txn2->GetId();
delete txn2;
wp_db->db_impl_->FlushWAL(true);
wp_db->TEST_Crash();
ReOpenNoDelete();
assert(db != nullptr);
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_FALSE(wp_db->delayed_prepared_empty_);
// 0 and 2 are prepared and 1 is committed
{
ReadLock rl(&wp_db->prepared_mutex_);
ASSERT_EQ(2, wp_db->delayed_prepared_.size());
const auto& end = wp_db->delayed_prepared_.end();
ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_0), end);
ASSERT_EQ(wp_db->delayed_prepared_.find(prep_seq_1), end);
ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_2), end);
}
ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
ASSERT_LE(prep_seq_2, wp_db->max_evicted_seq_);
// Commit all the remaining txns
txn0 = db->GetTransactionByName("xid" + istr0);
ASSERT_NE(txn0, nullptr);
txn0->Commit();
txn2 = db->GetTransactionByName("xid" + istr2);
ASSERT_NE(txn2, nullptr);
txn2->Commit();
// Check the value is committed after commit
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
ASSERT_TRUE(s.ok());
ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
pinnable_val.Reset();
delete txn0;
delete txn2;
wp_db->db_impl_->FlushWAL(true);
ReOpenNoDelete();
assert(db != nullptr);
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_TRUE(wp_db->delayed_prepared_empty_);
// Check the value is still committed after recovery
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
ASSERT_TRUE(s.ok());
ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
pinnable_val.Reset();
}
// After recovery the new transactions should still conflict with recovered
// transactions.
TEST_P(WritePreparedTransactionTest, ConflictDetectionAfterRecoveryTest) {
options.disable_auto_compactions = true;
ReOpen();
TransactionOptions txn_options;
WriteOptions write_options;
size_t index = 0;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
auto istr0 = std::to_string(index);
auto s = txn0->SetName("xid" + istr0);
ASSERT_OK(s);
s = txn0->Put(Slice("key" + istr0), Slice("bar0" + istr0));
ASSERT_OK(s);
s = txn0->Prepare();
// With the same index 0 and key prefix, txn_t0 should conflict with txn0
txn_t0_with_status(0, Status::TimedOut());
delete txn0;
auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
ReOpenNoDelete();
// It should still conflict after the recovery
txn_t0_with_status(0, Status::TimedOut());
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
ReOpenNoDelete();
// Check that a recovered txn will still cause conflicts after 2nd recovery
txn_t0_with_status(0, Status::TimedOut());
txn0 = db->GetTransactionByName("xid" + istr0);
ASSERT_NE(txn0, nullptr);
txn0->Commit();
delete txn0;
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
ReOpenNoDelete();
// tnx0 is now committed and should no longer cause a conflict
txn_t0_with_status(0, Status::OK());
}
// After recovery the commit map is empty while the max is set. The code would
// go through a different path which requires a separate test.
TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMapTest) {
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
wp_db->max_evicted_seq_ = 100;
ASSERT_FALSE(wp_db->IsInSnapshot(50, 40));
ASSERT_TRUE(wp_db->IsInSnapshot(50, 50));
ASSERT_TRUE(wp_db->IsInSnapshot(50, 100));
ASSERT_TRUE(wp_db->IsInSnapshot(50, 150));
ASSERT_FALSE(wp_db->IsInSnapshot(100, 80));
ASSERT_TRUE(wp_db->IsInSnapshot(100, 100));
ASSERT_TRUE(wp_db->IsInSnapshot(100, 150));
}
// Test WritePreparedTxnDB's IsInSnapshot against different ordering of
// snapshot, max_committed_seq_, prepared, and commit entries.
TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
WriteOptions wo;
// Use small commit cache to trigger lots of eviction and fast advance of
// max_evicted_seq_
const size_t commit_cache_bits = 3;
// Same for snapshot cache size
const size_t snapshot_cache_bits = 2;
// Take some preliminary snapshots first. This is to stress the data structure
// that holds the old snapshots as it will be designed to be efficient when
// only a few snapshots are below the max_evicted_seq_.
for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) {
// Leave some gap between the preliminary snapshots and the final snapshot
// that we check. This should test for also different overlapping scenarios
// between the last snapshot and the commits.
for (int max_gap = 1; max_gap < 10; max_gap++) {
// Since we do not actually write to db, we mock the seq as it would be
// increased by the db. The only exception is that we need db seq to
// advance for our snapshots. for which we apply a dummy put each time we
// increase our mock of seq.
uint64_t seq = 0;
// At each step we prepare a txn and then we commit it in the next txn.
// This emulates the consecutive transactions that write to the same key
uint64_t cur_txn = 0;
// Number of snapshots taken so far
int num_snapshots = 0;
// Number of gaps applied so far
int gap_cnt = 0;
// The final snapshot that we will inspect
uint64_t snapshot = 0;
bool found_committed = false;
// To stress the data structure that maintain prepared txns, at each cycle
// we add a new prepare txn. These do not mean to be committed for
// snapshot inspection.
std::set<uint64_t> prepared;
// We keep the list of txns committed before we take the last snapshot.
// These should be the only seq numbers that will be found in the snapshot
std::set<uint64_t> committed_before;
// The set of commit seq numbers to be excluded from IsInSnapshot queries
std::set<uint64_t> commit_seqs;
DBImpl* mock_db = new DBImpl(options, dbname);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits));
// We continue until max advances a bit beyond the snapshot.
while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
// do prepare for a transaction
seq++;
wp_db->AddPrepared(seq);
prepared.insert(seq);
// If cur_txn is not started, do prepare for it.
if (!cur_txn) {
seq++;
cur_txn = seq;
wp_db->AddPrepared(cur_txn);
} else { // else commit it
seq++;
wp_db->AddCommitted(cur_txn, seq);
wp_db->RemovePrepared(cur_txn);
commit_seqs.insert(seq);
if (!snapshot) {
committed_before.insert(cur_txn);
}
cur_txn = 0;
}
if (num_snapshots < max_snapshots - 1) {
// Take preliminary snapshots
wp_db->TakeSnapshot(seq);
num_snapshots++;
} else if (gap_cnt < max_gap) {
// Wait for some gap before taking the final snapshot
gap_cnt++;
} else if (!snapshot) {
// Take the final snapshot if it is not already taken
snapshot = seq;
wp_db->TakeSnapshot(snapshot);
num_snapshots++;
}
// If the snapshot is taken, verify seq numbers visible to it. We redo
// it at each cycle to test that the system is still sound when
// max_evicted_seq_ advances.
if (snapshot) {
for (uint64_t s = 1;
s <= seq && commit_seqs.find(s) == commit_seqs.end(); s++) {
bool was_committed =
(committed_before.find(s) != committed_before.end());
bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
if (was_committed != is_in_snapshot) {
printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64
" snapshot %" PRIu64
" gap_cnt %d num_snapshots %d s %" PRIu64 "\n",
max_snapshots, max_gap, seq,
wp_db->max_evicted_seq_.load(), snapshot, gap_cnt,
num_snapshots, s);
}
ASSERT_EQ(was_committed, is_in_snapshot);
found_committed = found_committed || is_in_snapshot;
}
}
}
// Safety check to make sure the test actually ran
ASSERT_TRUE(found_committed);
// As an extra check, check if prepared set will be properly empty after
// they are committed.
if (cur_txn) {
wp_db->AddCommitted(cur_txn, seq);
wp_db->RemovePrepared(cur_txn);
}
for (auto p : prepared) {
wp_db->AddCommitted(p, seq);
wp_db->RemovePrepared(p);
}
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
ASSERT_TRUE(wp_db->prepared_txns_.empty());
}
}
}
void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s,
PinnableSlice& exp_v, Slice key) {
Status s;
PinnableSlice v;
s = db->Get(roptions, db->DefaultColumnFamily(), key, &v);
ASSERT_TRUE(exp_s == s);
ASSERT_TRUE(s.ok() || s.IsNotFound());
if (s.ok()) {
ASSERT_TRUE(exp_v == v);
}
// Try with MultiGet API too
std::vector<std::string> values;
auto s_vec =
db->MultiGet(roptions, {db->DefaultColumnFamily()}, {key}, &values);
ASSERT_EQ(1, values.size());
ASSERT_EQ(1, s_vec.size());
s = s_vec[0];
ASSERT_TRUE(exp_s == s);
ASSERT_TRUE(s.ok() || s.IsNotFound());
if (s.ok()) {
ASSERT_TRUE(exp_v == values[0]);
}
}
void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v,
Slice key) {
ASSERT_SAME(ReadOptions(), db, exp_s, exp_v, key);
}
TEST_P(WritePreparedTransactionTest, RollbackTest) {
ReadOptions roptions;
WriteOptions woptions;
TransactionOptions txn_options;
const size_t num_keys = 4;
const size_t num_values = 5;
for (size_t ikey = 1; ikey <= num_keys; ikey++) {
for (size_t ivalue = 0; ivalue < num_values; ivalue++) {
for (bool crash : {false, true}) {
ReOpen();
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
std::string key_str = "key" + ToString(ikey);
switch (ivalue) {
case 0:
break;
case 1:
ASSERT_OK(db->Put(woptions, key_str, "initvalue1"));
break;
case 2:
ASSERT_OK(db->Merge(woptions, key_str, "initvalue2"));
break;
case 3:
ASSERT_OK(db->Delete(woptions, key_str));
break;
case 4:
ASSERT_OK(db->SingleDelete(woptions, key_str));
break;
default:
assert(0);
}
PinnableSlice v1;
auto s1 =
db->Get(roptions, db->DefaultColumnFamily(), Slice("key1"), &v1);
PinnableSlice v2;
auto s2 =
db->Get(roptions, db->DefaultColumnFamily(), Slice("key2"), &v2);
PinnableSlice v3;
auto s3 =
db->Get(roptions, db->DefaultColumnFamily(), Slice("key3"), &v3);
PinnableSlice v4;
auto s4 =
db->Get(roptions, db->DefaultColumnFamily(), Slice("key4"), &v4);
Transaction* txn = db->BeginTransaction(woptions, txn_options);
auto s = txn->SetName("xid0");
ASSERT_OK(s);
s = txn->Put(Slice("key1"), Slice("value1"));
ASSERT_OK(s);
s = txn->Merge(Slice("key2"), Slice("value2"));
ASSERT_OK(s);
s = txn->Delete(Slice("key3"));
ASSERT_OK(s);
s = txn->SingleDelete(Slice("key4"));
ASSERT_OK(s);
s = txn->Prepare();
ASSERT_OK(s);
{
ReadLock rl(&wp_db->prepared_mutex_);
ASSERT_FALSE(wp_db->prepared_txns_.empty());
ASSERT_EQ(txn->GetId(), wp_db->prepared_txns_.top());
}
ASSERT_SAME(db, s1, v1, "key1");
ASSERT_SAME(db, s2, v2, "key2");
ASSERT_SAME(db, s3, v3, "key3");
ASSERT_SAME(db, s4, v4, "key4");
if (crash) {
delete txn;
auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
ReOpenNoDelete();
assert(db != nullptr);
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
txn = db->GetTransactionByName("xid0");
ASSERT_FALSE(wp_db->delayed_prepared_empty_);
ReadLock rl(&wp_db->prepared_mutex_);
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_FALSE(wp_db->delayed_prepared_.empty());
ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) !=
wp_db->delayed_prepared_.end());
}
ASSERT_SAME(db, s1, v1, "key1");
ASSERT_SAME(db, s2, v2, "key2");
ASSERT_SAME(db, s3, v3, "key3");
ASSERT_SAME(db, s4, v4, "key4");
s = txn->Rollback();
ASSERT_OK(s);
{
ASSERT_TRUE(wp_db->delayed_prepared_empty_);
ReadLock rl(&wp_db->prepared_mutex_);
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
}
ASSERT_SAME(db, s1, v1, "key1");
ASSERT_SAME(db, s2, v2, "key2");
ASSERT_SAME(db, s3, v3, "key3");
ASSERT_SAME(db, s4, v4, "key4");
delete txn;
}
}
}
}
TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) {
// Use large buffer to avoid memtable flush after 1024 insertions
options.write_buffer_size = 1024 * 1024;
ReOpen();
std::vector<KeyVersion> versions;
uint64_t seq = 0;
for (uint64_t i = 1; i <= 1024; i++) {
std::string v = "bar" + ToString(i);
ASSERT_OK(db->Put(WriteOptions(), "foo", v));
VerifyKeys({{"foo", v}});
seq++; // one for the key/value
KeyVersion kv = {"foo", v, seq, kTypeValue};
if (options.two_write_queues) {
seq++; // one for the commit
}
versions.emplace_back(kv);
}
std::reverse(std::begin(versions), std::end(versions));
VerifyInternalKeys(versions);
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
// Use small buffer to ensure memtable flush during recovery
options.write_buffer_size = 1024;
ReOpenNoDelete();
VerifyInternalKeys(versions);
}
TEST_P(WritePreparedTransactionTest, SequenceNumberZeroTest) {
ASSERT_OK(db->Put(WriteOptions(), "foo", "bar"));
VerifyKeys({{"foo", "bar"}});
const Snapshot* snapshot = db->GetSnapshot();
ASSERT_OK(db->Flush(FlushOptions()));
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Compaction will output keys with sequence number 0, if it is visible to
// earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is
// visible to any snapshot.
VerifyKeys({{"foo", "bar"}});
VerifyKeys({{"foo", "bar"}}, snapshot);
VerifyInternalKeys({{"foo", "bar", 0, kTypeValue}});
db->ReleaseSnapshot(snapshot);
}
// Compaction should not remove a key if it is not committed, and should
// proceed with older versions of the key as-if the new version doesn't exist.
TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
options.disable_auto_compactions = true;
ReOpen();
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
// Snapshots to avoid keys get evicted.
std::vector<const Snapshot*> snapshots;
// Keep track of expected sequence number.
SequenceNumber expected_seq = 0;
auto add_key = [&](std::function<Status()> func) {
ASSERT_OK(func());
expected_seq++;
if (options.two_write_queues) {
expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
snapshots.push_back(db->GetSnapshot());
};
// Each key here represent a standalone test case.
add_key([&]() { return db->Put(WriteOptions(), "key1", "value1_1"); });
add_key([&]() { return db->Put(WriteOptions(), "key2", "value2_1"); });
add_key([&]() { return db->Put(WriteOptions(), "key3", "value3_1"); });
add_key([&]() { return db->Put(WriteOptions(), "key4", "value4_1"); });
add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_1"); });
add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_2"); });
add_key([&]() { return db->Put(WriteOptions(), "key6", "value6_1"); });
add_key([&]() { return db->Put(WriteOptions(), "key7", "value7_1"); });
ASSERT_OK(db->Flush(FlushOptions()));
add_key([&]() { return db->Delete(WriteOptions(), "key6"); });
add_key([&]() { return db->SingleDelete(WriteOptions(), "key7"); });
auto* transaction = db->BeginTransaction(WriteOptions());
ASSERT_OK(transaction->SetName("txn"));
ASSERT_OK(transaction->Put("key1", "value1_2"));
ASSERT_OK(transaction->Delete("key2"));
ASSERT_OK(transaction->SingleDelete("key3"));
ASSERT_OK(transaction->Merge("key4", "value4_2"));
ASSERT_OK(transaction->Merge("key5", "value5_3"));
ASSERT_OK(transaction->Put("key6", "value6_2"));
ASSERT_OK(transaction->Put("key7", "value7_2"));
// Prepare but not commit.
ASSERT_OK(transaction->Prepare());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
ASSERT_OK(db->Flush(FlushOptions()));
for (auto* s : snapshots) {
db->ReleaseSnapshot(s);
}
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyKeys({
{"key1", "value1_1"},
{"key2", "value2_1"},
{"key3", "value3_1"},
{"key4", "value4_1"},
{"key5", "value5_1,value5_2"},
{"key6", "NOT_FOUND"},
{"key7", "NOT_FOUND"},
});
VerifyInternalKeys({
{"key1", "value1_2", expected_seq, kTypeValue},
{"key1", "value1_1", 0, kTypeValue},
{"key2", "", expected_seq, kTypeDeletion},
{"key2", "value2_1", 0, kTypeValue},
{"key3", "", expected_seq, kTypeSingleDeletion},
{"key3", "value3_1", 0, kTypeValue},
{"key4", "value4_2", expected_seq, kTypeMerge},
{"key4", "value4_1", 0, kTypeValue},
{"key5", "value5_3", expected_seq, kTypeMerge},
{"key5", "value5_1,value5_2", 0, kTypeValue},
{"key6", "value6_2", expected_seq, kTypeValue},
{"key7", "value7_2", expected_seq, kTypeValue},
});
ASSERT_OK(transaction->Commit());
VerifyKeys({
{"key1", "value1_2"},
{"key2", "NOT_FOUND"},
{"key3", "NOT_FOUND"},
{"key4", "value4_1,value4_2"},
{"key5", "value5_1,value5_2,value5_3"},
{"key6", "value6_2"},
{"key7", "value7_2"},
});
delete transaction;
}
// Compaction should keep keys visible to a snapshot based on commit sequence,
// not just prepare sequence.
TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
options.disable_auto_compactions = true;
ReOpen();
// Keep track of expected sequence number.
SequenceNumber expected_seq = 0;
auto* txn1 = db->BeginTransaction(WriteOptions());
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn1->Put("key1", "value1_1"));
ASSERT_OK(txn1->Prepare());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
ASSERT_OK(txn1->Commit());
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
delete txn1;
// Take a snapshots to avoid keys get evicted before compaction.
const Snapshot* snapshot1 = db->GetSnapshot();
auto* txn2 = db->BeginTransaction(WriteOptions());
ASSERT_OK(txn2->SetName("txn2"));
ASSERT_OK(txn2->Put("key2", "value2_1"));
ASSERT_OK(txn2->Prepare());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
// txn1 commit before snapshot2 and it is visible to snapshot2.
// txn2 commit after snapshot2 and it is not visible.
const Snapshot* snapshot2 = db->GetSnapshot();
ASSERT_OK(txn2->Commit());
ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
delete txn2;
// Take a snapshots to avoid keys get evicted before compaction.
const Snapshot* snapshot3 = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
expected_seq++; // 1 for write
SequenceNumber seq1 = expected_seq;
if (options.two_write_queues) {
expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
expected_seq++; // 1 for write
SequenceNumber seq2 = expected_seq;
if (options.two_write_queues) {
expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
ASSERT_OK(db->Flush(FlushOptions()));
db->ReleaseSnapshot(snapshot1);
db->ReleaseSnapshot(snapshot3);
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}});
VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2);
VerifyInternalKeys({
{"key1", "value1_2", seq1, kTypeValue},
// "value1_1" is visible to snapshot2. Also keys at bottom level visible
// to earliest snapshot will output with seq = 0.
{"key1", "value1_1", 0, kTypeValue},
{"key2", "value2_2", seq2, kTypeValue},
});
db->ReleaseSnapshot(snapshot2);
}
// A more complex test to verify compaction/flush should keep keys visible
// to snapshots.
TEST_P(WritePreparedTransactionTest,
CompactionShouldKeepSnapshotVisibleKeysRandomized) {
constexpr size_t kNumTransactions = 10;
constexpr size_t kNumIterations = 1000;
std::vector<Transaction*> transactions(kNumTransactions, nullptr);
std::vector<size_t> versions(kNumTransactions, 0);
std::unordered_map<std::string, std::string> current_data;
std::vector<const Snapshot*> snapshots;
std::vector<std::unordered_map<std::string, std::string>> snapshot_data;
Random rnd(1103);
options.disable_auto_compactions = true;
ReOpen();
for (size_t i = 0; i < kNumTransactions; i++) {
std::string key = "key" + ToString(i);
std::string value = "value0";
ASSERT_OK(db->Put(WriteOptions(), key, value));
current_data[key] = value;
}
VerifyKeys(current_data);
for (size_t iter = 0; iter < kNumIterations; iter++) {
auto r = rnd.Next() % (kNumTransactions + 1);
if (r < kNumTransactions) {
std::string key = "key" + ToString(r);
if (transactions[r] == nullptr) {
std::string value = "value" + ToString(versions[r] + 1);
auto* txn = db->BeginTransaction(WriteOptions());
ASSERT_OK(txn->SetName("txn" + ToString(r)));
ASSERT_OK(txn->Put(key, value));
ASSERT_OK(txn->Prepare());
transactions[r] = txn;
} else {
std::string value = "value" + ToString(++versions[r]);
ASSERT_OK(transactions[r]->Commit());
delete transactions[r];
transactions[r] = nullptr;
current_data[key] = value;
}
} else {
auto* snapshot = db->GetSnapshot();
VerifyKeys(current_data, snapshot);
snapshots.push_back(snapshot);
snapshot_data.push_back(current_data);
}
VerifyKeys(current_data);
}
// Take a last snapshot to test compaction with uncommitted prepared
// transaction.
snapshots.push_back(db->GetSnapshot());
snapshot_data.push_back(current_data);
assert(snapshots.size() == snapshot_data.size());
for (size_t i = 0; i < snapshots.size(); i++) {
VerifyKeys(snapshot_data[i], snapshots[i]);
}
ASSERT_OK(db->Flush(FlushOptions()));
for (size_t i = 0; i < snapshots.size(); i++) {
VerifyKeys(snapshot_data[i], snapshots[i]);
}
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (size_t i = 0; i < snapshots.size(); i++) {
VerifyKeys(snapshot_data[i], snapshots[i]);
}
// cleanup
for (size_t i = 0; i < kNumTransactions; i++) {
if (transactions[i] == nullptr) {
continue;
}
ASSERT_OK(transactions[i]->Commit());
delete transactions[i];
}
for (size_t i = 0; i < snapshots.size(); i++) {
db->ReleaseSnapshot(snapshots[i]);
}
}
// Compaction should not apply the optimization to output key with sequence
// number equal to 0 if the key is not visible to earliest snapshot, based on
// commit sequence number.
TEST_P(WritePreparedTransactionTest,
CompactionShouldKeepSequenceForUncommittedKeys) {
options.disable_auto_compactions = true;
ReOpen();
// Keep track of expected sequence number.
SequenceNumber expected_seq = 0;
auto* transaction = db->BeginTransaction(WriteOptions());
ASSERT_OK(transaction->SetName("txn"));
ASSERT_OK(transaction->Put("key1", "value1"));
ASSERT_OK(transaction->Prepare());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
SequenceNumber seq1 = expected_seq;
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
expected_seq++; // one for data
if (options.two_write_queues) {
expected_seq++; // one for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
ASSERT_OK(db->Flush(FlushOptions()));
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyKeys({
{"key1", "NOT_FOUND"},
{"key2", "value2"},
});
VerifyInternalKeys({
// "key1" has not been committed. It keeps its sequence number.
{"key1", "value1", seq1, kTypeValue},
// "key2" is committed and output with seq = 0.
{"key2", "value2", 0, kTypeValue},
});
ASSERT_OK(transaction->Commit());
VerifyKeys({
{"key1", "value1"},
{"key2", "value2"},
});
delete transaction;
}
TEST_P(WritePreparedTransactionTest, Iterate) {
auto verify_state = [](Iterator* iter, const std::string& key,
const std::string& value) {
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(key, iter->key().ToString());
ASSERT_EQ(value, iter->value().ToString());
};
auto verify_iter = [&](const std::string& expected_val) {
// Get iterator from a concurrent transaction and make sure it has the
// same view as an iterator from the DB.
auto* txn = db->BeginTransaction(WriteOptions());
for (int i = 0; i < 2; i++) {
Iterator* iter = (i == 0)
? db->NewIterator(ReadOptions())
: txn->GetIterator(ReadOptions());
// Seek
iter->Seek("foo");
verify_state(iter, "foo", expected_val);
// Next
iter->Seek("a");
verify_state(iter, "a", "va");
iter->Next();
verify_state(iter, "foo", expected_val);
// SeekForPrev
iter->SeekForPrev("y");
verify_state(iter, "foo", expected_val);
// Prev
iter->SeekForPrev("z");
verify_state(iter, "z", "vz");
iter->Prev();
verify_state(iter, "foo", expected_val);
delete iter;
}
delete txn;
};
ASSERT_OK(db->Put(WriteOptions(), "foo", "v1"));
auto* transaction = db->BeginTransaction(WriteOptions());
ASSERT_OK(transaction->SetName("txn"));
ASSERT_OK(transaction->Put("foo", "v2"));
ASSERT_OK(transaction->Prepare());
VerifyKeys({{"foo", "v1"}});
// dummy keys
ASSERT_OK(db->Put(WriteOptions(), "a", "va"));
ASSERT_OK(db->Put(WriteOptions(), "z", "vz"));
verify_iter("v1");
ASSERT_OK(transaction->Commit());
VerifyKeys({{"foo", "v2"}});
verify_iter("v2");
delete transaction;
}
TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
Iterator* iter = db->NewIterator(ReadOptions());
ASSERT_TRUE(iter->Refresh().IsNotSupported());
delete iter;
}
// Test that updating the commit map will not affect the existing snapshots
TEST_P(WritePreparedTransactionTest, AtomicCommit) {
for (bool skip_prepare : {true, false}) {
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"WritePreparedTxnDB::AddCommitted:start",
"AtomicCommit::GetSnapshot:start"},
{"AtomicCommit::Get:end",
"WritePreparedTxnDB::AddCommitted:start:pause"},
{"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"},
{"AtomicCommit::Get2:end",
"WritePreparedTxnDB::AddCommitted:end:pause:"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rocksdb::port::Thread write_thread([&]() {
if (skip_prepare) {
db->Put(WriteOptions(), Slice("key"), Slice("value"));
} else {
Transaction* txn =
db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->SetName("xid"));
ASSERT_OK(txn->Put(Slice("key"), Slice("value")));
ASSERT_OK(txn->Prepare());
ASSERT_OK(txn->Commit());
delete txn;
}
});
rocksdb::port::Thread read_thread([&]() {
ReadOptions roptions;
TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start");
roptions.snapshot = db->GetSnapshot();
PinnableSlice val;
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &val);
TEST_SYNC_POINT("AtomicCommit::Get:end");
TEST_SYNC_POINT("AtomicCommit::Get2:start");
ASSERT_SAME(roptions, db, s, val, "key");
TEST_SYNC_POINT("AtomicCommit::Get2:end");
db->ReleaseSnapshot(roptions.snapshot);
});
read_thread.join();
write_thread.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
}
// Test that we can change write policy from WriteCommitted to WritePrepared
// after a clean shutdown (which would empty the WAL)
TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) {
bool empty_wal = true;
CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, empty_wal);
}
// Test that we fail fast if WAL is not emptied between changing the write
// policy from WriteCommitted to WritePrepared
TEST_P(WritePreparedTransactionTest, WP_WC_WALBackwardIncompatibility) {
bool empty_wal = true;
CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, !empty_wal);
}
// Test that we can change write policy from WritePrepare back to WriteCommitted
// after a clean shutdown (which would empty the WAL)
TEST_P(WritePreparedTransactionTest, WC_WP_ForwardCompatibility) {
bool empty_wal = true;
CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, empty_wal);
}
// Test that we fail fast if WAL is not emptied between changing the write
// policy from WriteCommitted to WritePrepared
TEST_P(WritePreparedTransactionTest, WC_WP_WALForwardIncompatibility) {
bool empty_wal = true;
CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, !empty_wal);
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
#include <stdio.h>
int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr,
"SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
return 0;
}
#endif // ROCKSDB_LITE