// Copyright (c) 2015, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. #ifndef ROCKSDB_LITE #include #include "db/db_impl.h" #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" #include "table/mock_table.h" #include "util/logging.h" #include "util/testharness.h" #include "util/testutil.h" #include "utilities/merge_operators.h" #include "utilities/merge_operators/string_append/stringappend.h" using std::string; namespace rocksdb { class TransactionTest : public testing::Test { public: TransactionDB* db; string dbname; Options options; TransactionDBOptions txn_db_options; TransactionTest() { options.create_if_missing = true; options.max_write_buffer_number = 2; options.write_buffer_size = 4 * 1024; options.level0_file_num_compaction_trigger = 2; options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); dbname = test::TmpDir() + "/transaction_testdb"; DestroyDB(dbname, options); txn_db_options.transaction_lock_timeout = 0; txn_db_options.default_lock_timeout = 0; Status s = TransactionDB::Open(options, txn_db_options, dbname, &db); assert(s.ok()); } ~TransactionTest() { delete db; DestroyDB(dbname, options); } Status ReOpen() { delete db; DestroyDB(dbname, options); Status s = TransactionDB::Open(options, txn_db_options, dbname, &db); return s; } }; TEST_F(TransactionTest, SuccessTest) { WriteOptions write_options; ReadOptions read_options; string value; Status s; db->Put(write_options, Slice("foo"), Slice("bar")); db->Put(write_options, Slice("foo2"), Slice("bar")); Transaction* txn = db->BeginTransaction(write_options, TransactionOptions()); ASSERT_TRUE(txn); ASSERT_EQ(0, txn->GetNumPuts()); s = txn->GetForUpdate(read_options, "foo", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar"); s = txn->Put(Slice("foo"), Slice("bar2")); ASSERT_OK(s); ASSERT_EQ(1, txn->GetNumPuts()); s = txn->GetForUpdate(read_options, "foo", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar2"); s = txn->Commit(); ASSERT_OK(s); s = db->Get(read_options, "foo", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar2"); delete txn; } TEST_F(TransactionTest, FirstWriteTest) { WriteOptions write_options; // Test conflict checking against the very first write to a db. // The transaction's snapshot will have seq 1 and the following write // will have sequence 1. Status s = db->Put(write_options, "A", "a"); Transaction* txn = db->BeginTransaction(write_options); txn->SetSnapshot(); ASSERT_OK(s); s = txn->Put("A", "b"); ASSERT_OK(s); delete txn; } TEST_F(TransactionTest, FirstWriteTest2) { WriteOptions write_options; Transaction* txn = db->BeginTransaction(write_options); txn->SetSnapshot(); // Test conflict checking against the very first write to a db. // The transaction's snapshot is a seq 0 while the following write // will have sequence 1. Status s = db->Put(write_options, "A", "a"); ASSERT_OK(s); s = txn->Put("A", "b"); ASSERT_TRUE(s.IsBusy()); delete txn; } TEST_F(TransactionTest, WriteOptionsTest) { WriteOptions write_options; write_options.sync = true; write_options.disableWAL = true; Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); ASSERT_TRUE(txn->GetWriteOptions()->sync); write_options.sync = false; txn->SetWriteOptions(write_options); ASSERT_FALSE(txn->GetWriteOptions()->sync); ASSERT_TRUE(txn->GetWriteOptions()->disableWAL); delete txn; } TEST_F(TransactionTest, WriteConflictTest) { WriteOptions write_options; ReadOptions read_options; string value; Status s; db->Put(write_options, "foo", "A"); db->Put(write_options, "foo2", "B"); Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); s = txn->Put("foo", "A2"); ASSERT_OK(s); s = txn->Put("foo2", "B2"); ASSERT_OK(s); // This Put outside of a transaction will conflict with the previous write s = db->Put(write_options, "foo", "xxx"); ASSERT_TRUE(s.IsTimedOut()); s = db->Get(read_options, "foo", &value); ASSERT_EQ(value, "A"); s = txn->Commit(); ASSERT_OK(s); db->Get(read_options, "foo", &value); ASSERT_EQ(value, "A2"); db->Get(read_options, "foo2", &value); ASSERT_EQ(value, "B2"); delete txn; } TEST_F(TransactionTest, WriteConflictTest2) { WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; string value; Status s; db->Put(write_options, "foo", "bar"); txn_options.set_snapshot = true; Transaction* txn = db->BeginTransaction(write_options, txn_options); ASSERT_TRUE(txn); // This Put outside of a transaction will conflict with a later write s = db->Put(write_options, "foo", "barz"); ASSERT_OK(s); s = txn->Put("foo2", "X"); ASSERT_OK(s); s = txn->Put("foo", "bar2"); // Conflicts with write done after snapshot taken ASSERT_TRUE(s.IsBusy()); s = txn->Put("foo3", "Y"); ASSERT_OK(s); s = db->Get(read_options, "foo", &value); ASSERT_EQ(value, "barz"); ASSERT_EQ(2, txn->GetNumKeys()); s = txn->Commit(); ASSERT_OK(s); // Txn should commit, but only write foo2 and foo3 // Verify that transaction wrote foo2 and foo3 but not foo db->Get(read_options, "foo", &value); ASSERT_EQ(value, "barz"); db->Get(read_options, "foo2", &value); ASSERT_EQ(value, "X"); db->Get(read_options, "foo3", &value); ASSERT_EQ(value, "Y"); delete txn; } TEST_F(TransactionTest, ReadConflictTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; TransactionOptions txn_options; string value; Status s; db->Put(write_options, "foo", "bar"); db->Put(write_options, "foo2", "bar"); txn_options.set_snapshot = true; Transaction* txn = db->BeginTransaction(write_options, txn_options); ASSERT_TRUE(txn); txn->SetSnapshot(); snapshot_read_options.snapshot = txn->GetSnapshot(); txn->GetForUpdate(snapshot_read_options, "foo", &value); ASSERT_EQ(value, "bar"); // This Put outside of a transaction will conflict with the previous read s = db->Put(write_options, "foo", "barz"); ASSERT_TRUE(s.IsTimedOut()); s = db->Get(read_options, "foo", &value); ASSERT_EQ(value, "bar"); s = txn->Get(read_options, "foo", &value); ASSERT_EQ(value, "bar"); s = txn->Commit(); ASSERT_OK(s); delete txn; } TEST_F(TransactionTest, TxnOnlyTest) { // Test to make sure transactions work when there are no other writes in an // empty db. WriteOptions write_options; ReadOptions read_options; string value; Status s; Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); s = txn->Put("x", "y"); ASSERT_OK(s); s = txn->Commit(); ASSERT_OK(s); delete txn; } TEST_F(TransactionTest, FlushTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; string value; Status s; db->Put(write_options, Slice("foo"), Slice("bar")); db->Put(write_options, Slice("foo2"), Slice("bar")); Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); snapshot_read_options.snapshot = txn->GetSnapshot(); txn->GetForUpdate(snapshot_read_options, "foo", &value); ASSERT_EQ(value, "bar"); s = txn->Put(Slice("foo"), Slice("bar2")); ASSERT_OK(s); txn->GetForUpdate(snapshot_read_options, "foo", &value); ASSERT_EQ(value, "bar2"); // Put a random key so we have a memtable to flush s = db->Put(write_options, "dummy", "dummy"); ASSERT_OK(s); // force a memtable flush FlushOptions flush_ops; db->Flush(flush_ops); s = txn->Commit(); // txn should commit since the flushed table is still in MemtableList History ASSERT_OK(s); db->Get(read_options, "foo", &value); ASSERT_EQ(value, "bar2"); delete txn; } TEST_F(TransactionTest, FlushTest2) { const size_t num_tests = 3; for (size_t n = 0; n < num_tests; n++) { // Test different table factories switch (n) { case 0: break; case 1: options.table_factory.reset(new mock::MockTableFactory()); break; case 2: { PlainTableOptions pt_opts; pt_opts.hash_table_ratio = 0; options.table_factory.reset(NewPlainTableFactory(pt_opts)); break; } } Status s = ReOpen(); ASSERT_OK(s); WriteOptions write_options; ReadOptions read_options, snapshot_read_options; TransactionOptions txn_options; string value; DBImpl* db_impl = reinterpret_cast(db->GetBaseDB()); db->Put(write_options, Slice("foo"), Slice("bar")); db->Put(write_options, Slice("foo2"), Slice("bar2")); db->Put(write_options, Slice("foo3"), Slice("bar3")); txn_options.set_snapshot = true; Transaction* txn = db->BeginTransaction(write_options, txn_options); ASSERT_TRUE(txn); snapshot_read_options.snapshot = txn->GetSnapshot(); txn->GetForUpdate(snapshot_read_options, "foo", &value); ASSERT_EQ(value, "bar"); s = txn->Put(Slice("foo"), Slice("bar2")); ASSERT_OK(s); txn->GetForUpdate(snapshot_read_options, "foo", &value); ASSERT_EQ(value, "bar2"); // verify foo is locked by txn s = db->Delete(write_options, "foo"); ASSERT_TRUE(s.IsTimedOut()); s = db->Put(write_options, "Z", "z"); ASSERT_OK(s); s = db->Put(write_options, "dummy", "dummy"); ASSERT_OK(s); s = db->Put(write_options, "S", "s"); ASSERT_OK(s); s = db->SingleDelete(write_options, "S"); ASSERT_OK(s); s = txn->Delete("S"); // Should fail after encountering a write to S in memtable ASSERT_TRUE(s.IsBusy()); // force a memtable flush s = db_impl->TEST_FlushMemTable(true); ASSERT_OK(s); // Put a random key so we have a MemTable to flush s = db->Put(write_options, "dummy", "dummy2"); ASSERT_OK(s); // force a memtable flush ASSERT_OK(db_impl->TEST_FlushMemTable(true)); s = db->Put(write_options, "dummy", "dummy3"); ASSERT_OK(s); // force a memtable flush // Since our test db has max_write_buffer_number=2, this flush will cause // the first memtable to get purged from the MemtableList history. ASSERT_OK(db_impl->TEST_FlushMemTable(true)); s = txn->Put("X", "Y"); // Should succeed after verifying there is no write to X in SST file ASSERT_OK(s); s = txn->Put("Z", "zz"); // Should fail after encountering a write to Z in SST file ASSERT_TRUE(s.IsBusy()); s = txn->GetForUpdate(read_options, "foo2", &value); // should succeed since key was written before txn started ASSERT_OK(s); // verify foo2 is locked by txn s = db->Delete(write_options, "foo2"); ASSERT_TRUE(s.IsTimedOut()); s = txn->Delete("S"); // Should fail after encountering a write to S in SST file fprintf(stderr, "%lu %s\n", n, s.ToString().c_str()); ASSERT_TRUE(s.IsBusy()); // Write a bunch of keys to db to force a compaction Random rnd(47); for (int i = 0; i < 1000; i++) { s = db->Put(write_options, std::to_string(i), test::CompressibleString(&rnd, 0.8, 100, &value)); ASSERT_OK(s); } s = txn->Put("X", "yy"); // Should succeed after verifying there is no write to X in SST file ASSERT_OK(s); s = txn->Put("Z", "zzz"); // Should fail after encountering a write to Z in SST file ASSERT_TRUE(s.IsBusy()); s = txn->Delete("S"); // Should fail after encountering a write to S in SST file ASSERT_TRUE(s.IsBusy()); s = txn->GetForUpdate(read_options, "foo3", &value); // should succeed since key was written before txn started ASSERT_OK(s); // verify foo3 is locked by txn s = db->Delete(write_options, "foo3"); ASSERT_TRUE(s.IsTimedOut()); db_impl->TEST_WaitForCompact(); s = txn->Commit(); ASSERT_OK(s); // Transaction should only write the keys that succeeded. s = db->Get(read_options, "foo", &value); ASSERT_EQ(value, "bar2"); s = db->Get(read_options, "X", &value); ASSERT_OK(s); ASSERT_EQ("yy", value); s = db->Get(read_options, "Z", &value); ASSERT_OK(s); ASSERT_EQ("z", value); delete txn; } } TEST_F(TransactionTest, NoSnapshotTest) { WriteOptions write_options; ReadOptions read_options; string value; Status s; db->Put(write_options, "AAA", "bar"); Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); // Modify key after transaction start db->Put(write_options, "AAA", "bar1"); // Read and write without a snapshot txn->GetForUpdate(read_options, "AAA", &value); ASSERT_EQ(value, "bar1"); s = txn->Put("AAA", "bar2"); ASSERT_OK(s); // Should commit since read/write was done after data changed s = txn->Commit(); ASSERT_OK(s); txn->GetForUpdate(read_options, "AAA", &value); ASSERT_EQ(value, "bar2"); delete txn; } TEST_F(TransactionTest, MultipleSnapshotTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; string value; Status s; db->Put(write_options, "AAA", "bar"); db->Put(write_options, "BBB", "bar"); db->Put(write_options, "CCC", "bar"); Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); db->Put(write_options, "AAA", "bar1"); // Read and write without a snapshot txn->GetForUpdate(read_options, "AAA", &value); ASSERT_EQ(value, "bar1"); s = txn->Put("AAA", "bar2"); ASSERT_OK(s); // Modify BBB before snapshot is taken db->Put(write_options, "BBB", "bar1"); txn->SetSnapshot(); snapshot_read_options.snapshot = txn->GetSnapshot(); // Read and write with snapshot txn->GetForUpdate(snapshot_read_options, "BBB", &value); ASSERT_EQ(value, "bar1"); s = txn->Put("BBB", "bar2"); ASSERT_OK(s); db->Put(write_options, "CCC", "bar1"); // Set a new snapshot txn->SetSnapshot(); snapshot_read_options.snapshot = txn->GetSnapshot(); // Read and write with snapshot txn->GetForUpdate(snapshot_read_options, "CCC", &value); ASSERT_EQ(value, "bar1"); s = txn->Put("CCC", "bar2"); ASSERT_OK(s); s = txn->GetForUpdate(read_options, "AAA", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar2"); s = txn->GetForUpdate(read_options, "BBB", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar2"); s = txn->GetForUpdate(read_options, "CCC", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar2"); s = db->Get(read_options, "AAA", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar1"); s = db->Get(read_options, "BBB", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar1"); s = db->Get(read_options, "CCC", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar1"); s = txn->Commit(); ASSERT_OK(s); s = db->Get(read_options, "AAA", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar2"); s = db->Get(read_options, "BBB", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar2"); s = db->Get(read_options, "CCC", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar2"); // verify that we track multiple writes to the same key at different snapshots delete txn; txn = db->BeginTransaction(write_options); // Potentially conflicting writes db->Put(write_options, "ZZZ", "zzz"); db->Put(write_options, "XXX", "xxx"); txn->SetSnapshot(); TransactionOptions txn_options; txn_options.set_snapshot = true; Transaction* txn2 = db->BeginTransaction(write_options, txn_options); txn2->SetSnapshot(); // This should not conflict in txn since the snapshot is later than the // previous write (spoiler alert: it will later conflict with txn2). s = txn->Put("ZZZ", "zzzz"); ASSERT_OK(s); s = txn->Commit(); ASSERT_OK(s); delete txn; // This will conflict since the snapshot is earlier than another write to ZZZ s = txn2->Put("ZZZ", "xxxxx"); ASSERT_TRUE(s.IsBusy()); s = txn2->Commit(); ASSERT_OK(s); s = db->Get(read_options, "ZZZ", &value); ASSERT_OK(s); ASSERT_EQ(value, "zzzz"); delete txn2; } TEST_F(TransactionTest, ColumnFamiliesTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; TransactionOptions txn_options; string value; Status s; ColumnFamilyHandle *cfa, *cfb; ColumnFamilyOptions cf_options; // Create 2 new column families s = db->CreateColumnFamily(cf_options, "CFA", &cfa); ASSERT_OK(s); s = db->CreateColumnFamily(cf_options, "CFB", &cfb); ASSERT_OK(s); delete cfa; delete cfb; delete db; // open DB with three column families std::vector column_families; // have to open default column family column_families.push_back( ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions())); // open the new column families column_families.push_back( ColumnFamilyDescriptor("CFA", ColumnFamilyOptions())); column_families.push_back( ColumnFamilyDescriptor("CFB", ColumnFamilyOptions())); std::vector handles; s = TransactionDB::Open(options, txn_db_options, dbname, column_families, &handles, &db); ASSERT_OK(s); Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); txn->SetSnapshot(); snapshot_read_options.snapshot = txn->GetSnapshot(); txn_options.set_snapshot = true; Transaction* txn2 = db->BeginTransaction(write_options, txn_options); ASSERT_TRUE(txn2); // Write some data to the db WriteBatch batch; batch.Put("foo", "foo"); batch.Put(handles[1], "AAA", "bar"); batch.Put(handles[1], "AAAZZZ", "bar"); s = db->Write(write_options, &batch); ASSERT_OK(s); db->Delete(write_options, handles[1], "AAAZZZ"); // These keys do not conflict with existing writes since they're in // different column families s = txn->Delete("AAA"); ASSERT_OK(s); s = txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value); ASSERT_TRUE(s.IsNotFound()); Slice key_slice("AAAZZZ"); Slice value_slices[2] = {Slice("bar"), Slice("bar")}; s = txn->Put(handles[2], SliceParts(&key_slice, 1), SliceParts(value_slices, 2)); ASSERT_OK(s); ASSERT_EQ(3, txn->GetNumKeys()); s = txn->Commit(); ASSERT_OK(s); s = db->Get(read_options, "AAA", &value); ASSERT_TRUE(s.IsNotFound()); s = db->Get(read_options, handles[2], "AAAZZZ", &value); ASSERT_EQ(value, "barbar"); Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")}; Slice value_slice("barbarbar"); s = txn2->Delete(handles[2], "XXX"); ASSERT_OK(s); s = txn2->Delete(handles[1], "XXX"); ASSERT_OK(s); // This write will cause a conflict with the earlier batch write s = txn2->Put(handles[1], SliceParts(key_slices, 3), SliceParts(&value_slice, 1)); ASSERT_TRUE(s.IsBusy()); s = txn2->Commit(); ASSERT_OK(s); s = db->Get(read_options, handles[1], "AAAZZZ", &value); ASSERT_EQ(value, "barbar"); delete txn; delete txn2; txn = db->BeginTransaction(write_options, txn_options); snapshot_read_options.snapshot = txn->GetSnapshot(); txn2 = db->BeginTransaction(write_options, txn_options); ASSERT_TRUE(txn); std::vector multiget_cfh = {handles[1], handles[2], handles[0], handles[2]}; std::vector multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"}; std::vector values(4); std::vector results = txn->MultiGetForUpdate( snapshot_read_options, multiget_cfh, multiget_keys, &values); ASSERT_OK(results[0]); ASSERT_OK(results[1]); ASSERT_OK(results[2]); ASSERT_TRUE(results[3].IsNotFound()); ASSERT_EQ(values[0], "bar"); ASSERT_EQ(values[1], "barbar"); ASSERT_EQ(values[2], "foo"); s = txn->SingleDelete(handles[2], "ZZZ"); ASSERT_OK(s); s = txn->Put(handles[2], "ZZZ", "YYY"); ASSERT_OK(s); s = txn->Put(handles[2], "ZZZ", "YYYY"); ASSERT_OK(s); s = txn->Delete(handles[2], "ZZZ"); ASSERT_OK(s); s = txn->Put(handles[2], "AAAZZZ", "barbarbar"); ASSERT_OK(s); ASSERT_EQ(5, txn->GetNumKeys()); // Txn should commit s = txn->Commit(); ASSERT_OK(s); s = db->Get(read_options, handles[2], "ZZZ", &value); ASSERT_TRUE(s.IsNotFound()); // Put a key which will conflict with the next txn using the previous snapshot db->Put(write_options, handles[2], "foo", "000"); results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh, multiget_keys, &values); // All results should fail since there was a conflict ASSERT_TRUE(results[0].IsBusy()); ASSERT_TRUE(results[1].IsBusy()); ASSERT_TRUE(results[2].IsBusy()); ASSERT_TRUE(results[3].IsBusy()); s = db->Get(read_options, handles[2], "foo", &value); ASSERT_EQ(value, "000"); s = txn2->Commit(); ASSERT_OK(s); s = db->DropColumnFamily(handles[1]); ASSERT_OK(s); s = db->DropColumnFamily(handles[2]); ASSERT_OK(s); delete txn; delete txn2; for (auto handle : handles) { delete handle; } } TEST_F(TransactionTest, ColumnFamiliesTest2) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; TransactionOptions txn_options; string value; Status s; ColumnFamilyHandle *one, *two; ColumnFamilyOptions cf_options; // Create 2 new column families s = db->CreateColumnFamily(cf_options, "ONE", &one); ASSERT_OK(s); s = db->CreateColumnFamily(cf_options, "TWO", &two); ASSERT_OK(s); Transaction* txn1 = db->BeginTransaction(write_options); ASSERT_TRUE(txn1); Transaction* txn2 = db->BeginTransaction(write_options); ASSERT_TRUE(txn2); s = txn1->Put(one, "X", "1"); ASSERT_OK(s); s = txn1->Put(two, "X", "2"); ASSERT_OK(s); s = txn1->Put("X", "0"); ASSERT_OK(s); s = txn2->Put(one, "X", "11"); ASSERT_TRUE(s.IsTimedOut()); s = txn1->Commit(); ASSERT_OK(s); // Drop first column family s = db->DropColumnFamily(one); ASSERT_OK(s); // Should fail since column family was dropped. s = txn2->Commit(); ASSERT_OK(s); delete txn1; txn1 = db->BeginTransaction(write_options); ASSERT_TRUE(txn1); // Should fail since column family was dropped s = txn1->Put(one, "X", "111"); ASSERT_TRUE(s.IsInvalidArgument()); s = txn1->Put(two, "X", "222"); ASSERT_OK(s); s = txn1->Put("X", "000"); ASSERT_OK(s); s = txn1->Commit(); ASSERT_OK(s); s = db->Get(read_options, two, "X", &value); ASSERT_OK(s); ASSERT_EQ("222", value); s = db->Get(read_options, "X", &value); ASSERT_OK(s); ASSERT_EQ("000", value); s = db->DropColumnFamily(two); ASSERT_OK(s); delete txn1; delete txn2; delete one; delete two; } TEST_F(TransactionTest, EmptyTest) { WriteOptions write_options; ReadOptions read_options; string value; Status s; s = db->Put(write_options, "aaa", "aaa"); ASSERT_OK(s); Transaction* txn = db->BeginTransaction(write_options); s = txn->Commit(); ASSERT_OK(s); delete txn; txn = db->BeginTransaction(write_options); txn->Rollback(); delete txn; txn = db->BeginTransaction(write_options); s = txn->GetForUpdate(read_options, "aaa", &value); ASSERT_EQ(value, "aaa"); s = txn->Commit(); ASSERT_OK(s); delete txn; txn = db->BeginTransaction(write_options); txn->SetSnapshot(); s = txn->GetForUpdate(read_options, "aaa", &value); ASSERT_EQ(value, "aaa"); // Conflicts with previous GetForUpdate s = db->Put(write_options, "aaa", "xxx"); ASSERT_TRUE(s.IsTimedOut()); // transaction expired! s = txn->Commit(); ASSERT_OK(s); delete txn; } TEST_F(TransactionTest, PredicateManyPreceders) { WriteOptions write_options; ReadOptions read_options1, read_options2; TransactionOptions txn_options; string value; Status s; txn_options.set_snapshot = true; Transaction* txn1 = db->BeginTransaction(write_options, txn_options); read_options1.snapshot = txn1->GetSnapshot(); Transaction* txn2 = db->BeginTransaction(write_options); txn2->SetSnapshot(); read_options2.snapshot = txn2->GetSnapshot(); std::vector multiget_keys = {"1", "2", "3"}; std::vector multiget_values; std::vector results = txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values); ASSERT_TRUE(results[1].IsNotFound()); s = txn2->Put("2", "x"); // Conflict's with txn1's MultiGetForUpdate ASSERT_TRUE(s.IsTimedOut()); txn2->Rollback(); multiget_values.clear(); results = txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values); ASSERT_TRUE(results[1].IsNotFound()); s = txn1->Commit(); ASSERT_OK(s); delete txn1; delete txn2; txn1 = db->BeginTransaction(write_options, txn_options); read_options1.snapshot = txn1->GetSnapshot(); txn2 = db->BeginTransaction(write_options, txn_options); read_options2.snapshot = txn2->GetSnapshot(); s = txn1->Put("4", "x"); ASSERT_OK(s); s = txn2->Delete("4"); // conflict ASSERT_TRUE(s.IsTimedOut()); s = txn1->Commit(); ASSERT_OK(s); s = txn2->GetForUpdate(read_options2, "4", &value); ASSERT_TRUE(s.IsBusy()); txn2->Rollback(); delete txn1; delete txn2; } TEST_F(TransactionTest, LostUpdate) { WriteOptions write_options; ReadOptions read_options, read_options1, read_options2; TransactionOptions txn_options; string value; Status s; // Test 2 transactions writing to the same key in multiple orders and // with/without snapshots Transaction* txn1 = db->BeginTransaction(write_options); Transaction* txn2 = db->BeginTransaction(write_options); s = txn1->Put("1", "1"); ASSERT_OK(s); s = txn2->Put("1", "2"); // conflict ASSERT_TRUE(s.IsTimedOut()); s = txn2->Commit(); ASSERT_OK(s); s = txn1->Commit(); ASSERT_OK(s); s = db->Get(read_options, "1", &value); ASSERT_OK(s); ASSERT_EQ("1", value); delete txn1; delete txn2; txn_options.set_snapshot = true; txn1 = db->BeginTransaction(write_options, txn_options); read_options1.snapshot = txn1->GetSnapshot(); txn2 = db->BeginTransaction(write_options, txn_options); read_options2.snapshot = txn2->GetSnapshot(); s = txn1->Put("1", "3"); ASSERT_OK(s); s = txn2->Put("1", "4"); // conflict ASSERT_TRUE(s.IsTimedOut()); s = txn1->Commit(); ASSERT_OK(s); s = txn2->Commit(); ASSERT_OK(s); s = db->Get(read_options, "1", &value); ASSERT_OK(s); ASSERT_EQ("3", value); delete txn1; delete txn2; txn1 = db->BeginTransaction(write_options, txn_options); read_options1.snapshot = txn1->GetSnapshot(); txn2 = db->BeginTransaction(write_options, txn_options); read_options2.snapshot = txn2->GetSnapshot(); s = txn1->Put("1", "5"); ASSERT_OK(s); s = txn1->Commit(); ASSERT_OK(s); s = txn2->Put("1", "6"); ASSERT_TRUE(s.IsBusy()); s = txn2->Commit(); ASSERT_OK(s); s = db->Get(read_options, "1", &value); ASSERT_OK(s); ASSERT_EQ("5", value); delete txn1; delete txn2; txn1 = db->BeginTransaction(write_options, txn_options); read_options1.snapshot = txn1->GetSnapshot(); txn2 = db->BeginTransaction(write_options, txn_options); read_options2.snapshot = txn2->GetSnapshot(); s = txn1->Put("1", "7"); ASSERT_OK(s); s = txn1->Commit(); ASSERT_OK(s); txn2->SetSnapshot(); s = txn2->Put("1", "8"); ASSERT_OK(s); s = txn2->Commit(); ASSERT_OK(s); s = db->Get(read_options, "1", &value); ASSERT_OK(s); ASSERT_EQ("8", value); delete txn1; delete txn2; txn1 = db->BeginTransaction(write_options); txn2 = db->BeginTransaction(write_options); s = txn1->Put("1", "9"); ASSERT_OK(s); s = txn1->Commit(); ASSERT_OK(s); s = txn2->Put("1", "10"); ASSERT_OK(s); s = txn2->Commit(); ASSERT_OK(s); delete txn1; delete txn2; s = db->Get(read_options, "1", &value); ASSERT_OK(s); ASSERT_EQ(value, "10"); } TEST_F(TransactionTest, UntrackedWrites) { WriteOptions write_options; ReadOptions read_options; string value; Status s; // Verify transaction rollback works for untracked keys. Transaction* txn = db->BeginTransaction(write_options); txn->SetSnapshot(); s = txn->PutUntracked("untracked", "0"); ASSERT_OK(s); txn->Rollback(); s = db->Get(read_options, "untracked", &value); ASSERT_TRUE(s.IsNotFound()); delete txn; txn = db->BeginTransaction(write_options); txn->SetSnapshot(); s = db->Put(write_options, "untracked", "x"); ASSERT_OK(s); // Untracked writes should succeed even though key was written after snapshot s = txn->PutUntracked("untracked", "1"); ASSERT_OK(s); s = txn->MergeUntracked("untracked", "2"); ASSERT_OK(s); s = txn->DeleteUntracked("untracked"); ASSERT_OK(s); // Conflict s = txn->Put("untracked", "3"); ASSERT_TRUE(s.IsBusy()); s = txn->Commit(); ASSERT_OK(s); s = db->Get(read_options, "untracked", &value); ASSERT_TRUE(s.IsNotFound()); delete txn; } TEST_F(TransactionTest, ExpiredTransaction) { WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; string value; Status s; // Set txn expiration timeout to 0 microseconds (expires instantly) txn_options.expiration = 0; Transaction* txn1 = db->BeginTransaction(write_options, txn_options); s = txn1->Put("X", "1"); ASSERT_OK(s); s = txn1->Put("Y", "1"); ASSERT_OK(s); Transaction* txn2 = db->BeginTransaction(write_options); // txn2 should be able to write to X since txn1 has expired s = txn2->Put("X", "2"); ASSERT_OK(s); s = txn2->Commit(); ASSERT_OK(s); s = db->Get(read_options, "X", &value); ASSERT_OK(s); ASSERT_EQ("2", value); s = txn1->Put("Z", "1"); ASSERT_OK(s); // txn1 should fail to commit since it is expired s = txn1->Commit(); ASSERT_TRUE(s.IsExpired()); s = db->Get(read_options, "Y", &value); ASSERT_TRUE(s.IsNotFound()); s = db->Get(read_options, "Z", &value); ASSERT_TRUE(s.IsNotFound()); delete txn1; delete txn2; } TEST_F(TransactionTest, Rollback) { WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; string value; Status s; Transaction* txn1 = db->BeginTransaction(write_options, txn_options); ASSERT_OK(s); s = txn1->Put("X", "1"); ASSERT_OK(s); Transaction* txn2 = db->BeginTransaction(write_options); // txn2 should not be able to write to X since txn1 has it locked s = txn2->Put("X", "2"); ASSERT_TRUE(s.IsTimedOut()); txn1->Rollback(); delete txn1; // txn2 should now be able to write to X s = txn2->Put("X", "3"); ASSERT_OK(s); s = txn2->Commit(); ASSERT_OK(s); s = db->Get(read_options, "X", &value); ASSERT_OK(s); ASSERT_EQ("3", value); delete txn2; } TEST_F(TransactionTest, LockLimitTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; TransactionOptions txn_options; string value; Status s; delete db; // Open DB with a lock limit of 3 txn_db_options.max_num_locks = 3; s = TransactionDB::Open(options, txn_db_options, dbname, &db); ASSERT_OK(s); // Create a txn and verify we can only lock up to 3 keys Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); s = txn->Put("X", "x"); ASSERT_OK(s); s = txn->Put("Y", "y"); ASSERT_OK(s); s = txn->Put("Z", "z"); ASSERT_OK(s); // lock limit reached s = txn->Put("W", "w"); ASSERT_TRUE(s.IsBusy()); // re-locking same key shouldn't put us over the limit s = txn->Put("X", "xx"); ASSERT_OK(s); s = txn->GetForUpdate(read_options, "W", &value); ASSERT_TRUE(s.IsBusy()); s = txn->GetForUpdate(read_options, "V", &value); ASSERT_TRUE(s.IsBusy()); // re-locking same key shouldn't put us over the limit s = txn->GetForUpdate(read_options, "Y", &value); ASSERT_OK(s); ASSERT_EQ("y", value); s = txn->Get(read_options, "W", &value); ASSERT_TRUE(s.IsNotFound()); Transaction* txn2 = db->BeginTransaction(write_options); ASSERT_TRUE(txn2); // "X" currently locked s = txn2->Put("X", "x"); ASSERT_TRUE(s.IsTimedOut()); // lock limit reached s = txn2->Put("M", "m"); ASSERT_TRUE(s.IsBusy()); s = txn->Commit(); ASSERT_OK(s); s = db->Get(read_options, "X", &value); ASSERT_OK(s); ASSERT_EQ("xx", value); s = db->Get(read_options, "W", &value); ASSERT_TRUE(s.IsNotFound()); // Committing txn should release its locks and allow txn2 to proceed s = txn2->Put("X", "x2"); ASSERT_OK(s); s = txn2->Delete("X"); ASSERT_OK(s); s = txn2->Put("M", "m"); ASSERT_OK(s); s = txn2->Put("Z", "z2"); ASSERT_OK(s); // lock limit reached s = txn2->Delete("Y"); ASSERT_TRUE(s.IsBusy()); s = txn2->Commit(); ASSERT_OK(s); s = db->Get(read_options, "Z", &value); ASSERT_OK(s); ASSERT_EQ("z2", value); s = db->Get(read_options, "Y", &value); ASSERT_OK(s); ASSERT_EQ("y", value); s = db->Get(read_options, "X", &value); ASSERT_TRUE(s.IsNotFound()); delete txn; delete txn2; } TEST_F(TransactionTest, IteratorTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; TransactionOptions txn_options; string value; Status s; // Write some keys to the db s = db->Put(write_options, "A", "a"); ASSERT_OK(s); s = db->Put(write_options, "G", "g"); ASSERT_OK(s); s = db->Put(write_options, "F", "f"); ASSERT_OK(s); s = db->Put(write_options, "C", "c"); ASSERT_OK(s); s = db->Put(write_options, "D", "d"); ASSERT_OK(s); Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); // Write some keys in a txn s = txn->Put("B", "b"); ASSERT_OK(s); s = txn->Put("H", "h"); ASSERT_OK(s); s = txn->Delete("D"); ASSERT_OK(s); s = txn->Put("E", "e"); ASSERT_OK(s); txn->SetSnapshot(); const Snapshot* snapshot = txn->GetSnapshot(); // Write some keys to the db after the snapshot s = db->Put(write_options, "BB", "xx"); ASSERT_OK(s); s = db->Put(write_options, "C", "xx"); ASSERT_OK(s); read_options.snapshot = snapshot; Iterator* iter = txn->GetIterator(read_options); ASSERT_OK(iter->status()); iter->SeekToFirst(); // Read all keys via iter and lock them all std::string results[] = {"a", "b", "c", "e", "f", "g", "h"}; for (int i = 0; i < 7; i++) { ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); ASSERT_EQ(results[i], iter->value().ToString()); s = txn->GetForUpdate(read_options, iter->key(), nullptr); if (i == 2) { // "C" was modified after txn's snapshot ASSERT_TRUE(s.IsBusy()); } else { ASSERT_OK(s); } iter->Next(); } ASSERT_FALSE(iter->Valid()); iter->Seek("G"); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); ASSERT_EQ("g", iter->value().ToString()); iter->Prev(); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); ASSERT_EQ("f", iter->value().ToString()); iter->Seek("D"); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); ASSERT_EQ("e", iter->value().ToString()); iter->Seek("C"); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); ASSERT_EQ("c", iter->value().ToString()); iter->Next(); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); ASSERT_EQ("e", iter->value().ToString()); iter->Seek(""); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); ASSERT_EQ("a", iter->value().ToString()); iter->Seek("X"); ASSERT_OK(iter->status()); ASSERT_FALSE(iter->Valid()); iter->SeekToLast(); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); ASSERT_EQ("h", iter->value().ToString()); s = txn->Commit(); ASSERT_OK(s); delete iter; delete txn; } TEST_F(TransactionTest, DisableIndexingTest) { WriteOptions write_options; ReadOptions read_options; string value; Status s; Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); s = txn->Put("A", "a"); ASSERT_OK(s); s = txn->Get(read_options, "A", &value); ASSERT_OK(s); ASSERT_EQ("a", value); txn->DisableIndexing(); s = txn->Put("B", "b"); ASSERT_OK(s); s = txn->Get(read_options, "B", &value); ASSERT_TRUE(s.IsNotFound()); Iterator* iter = txn->GetIterator(read_options); ASSERT_OK(iter->status()); iter->Seek("B"); ASSERT_OK(iter->status()); ASSERT_FALSE(iter->Valid()); s = txn->Delete("A"); s = txn->Get(read_options, "A", &value); ASSERT_OK(s); ASSERT_EQ("a", value); txn->EnableIndexing(); s = txn->Put("B", "bb"); ASSERT_OK(s); iter->Seek("B"); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); ASSERT_EQ("bb", iter->value().ToString()); s = txn->Get(read_options, "B", &value); ASSERT_OK(s); ASSERT_EQ("bb", value); s = txn->Put("A", "aa"); ASSERT_OK(s); s = txn->Get(read_options, "A", &value); ASSERT_OK(s); ASSERT_EQ("aa", value); delete iter; delete txn; } TEST_F(TransactionTest, SavepointTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; TransactionOptions txn_options; string value; Status s; Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); ASSERT_EQ(0, txn->GetNumPuts()); s = txn->RollbackToSavePoint(); ASSERT_TRUE(s.IsNotFound()); txn->SetSavePoint(); // 1 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn s = txn->RollbackToSavePoint(); ASSERT_TRUE(s.IsNotFound()); s = txn->Put("B", "b"); ASSERT_OK(s); ASSERT_EQ(1, txn->GetNumPuts()); ASSERT_EQ(0, txn->GetNumDeletes()); s = txn->Commit(); ASSERT_OK(s); s = db->Get(read_options, "B", &value); ASSERT_OK(s); ASSERT_EQ("b", value); delete txn; txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); s = txn->Put("A", "a"); ASSERT_OK(s); s = txn->Put("B", "bb"); ASSERT_OK(s); s = txn->Put("C", "c"); ASSERT_OK(s); txn->SetSavePoint(); // 2 s = txn->Delete("B"); ASSERT_OK(s); s = txn->Put("C", "cc"); ASSERT_OK(s); s = txn->Put("D", "d"); ASSERT_OK(s); ASSERT_EQ(5, txn->GetNumPuts()); ASSERT_EQ(1, txn->GetNumDeletes()); ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2 ASSERT_EQ(3, txn->GetNumPuts()); ASSERT_EQ(0, txn->GetNumDeletes()); s = txn->Get(read_options, "A", &value); ASSERT_OK(s); ASSERT_EQ("a", value); s = txn->Get(read_options, "B", &value); ASSERT_OK(s); ASSERT_EQ("bb", value); s = txn->Get(read_options, "C", &value); ASSERT_OK(s); ASSERT_EQ("c", value); s = txn->Get(read_options, "D", &value); ASSERT_TRUE(s.IsNotFound()); s = txn->Put("A", "a"); ASSERT_OK(s); s = txn->Put("E", "e"); ASSERT_OK(s); ASSERT_EQ(5, txn->GetNumPuts()); ASSERT_EQ(0, txn->GetNumDeletes()); // Rollback to beginning of txn s = txn->RollbackToSavePoint(); ASSERT_TRUE(s.IsNotFound()); txn->Rollback(); ASSERT_EQ(0, txn->GetNumPuts()); ASSERT_EQ(0, txn->GetNumDeletes()); s = txn->Get(read_options, "A", &value); ASSERT_TRUE(s.IsNotFound()); s = txn->Get(read_options, "B", &value); ASSERT_OK(s); ASSERT_EQ("b", value); s = txn->Get(read_options, "D", &value); ASSERT_TRUE(s.IsNotFound()); s = txn->Get(read_options, "D", &value); ASSERT_TRUE(s.IsNotFound()); s = txn->Get(read_options, "E", &value); ASSERT_TRUE(s.IsNotFound()); s = txn->Put("A", "aa"); ASSERT_OK(s); s = txn->Put("F", "f"); ASSERT_OK(s); ASSERT_EQ(2, txn->GetNumPuts()); ASSERT_EQ(0, txn->GetNumDeletes()); txn->SetSavePoint(); // 3 txn->SetSavePoint(); // 4 s = txn->Put("G", "g"); ASSERT_OK(s); s = txn->SingleDelete("F"); ASSERT_OK(s); s = txn->Delete("B"); ASSERT_OK(s); s = txn->Get(read_options, "A", &value); ASSERT_OK(s); ASSERT_EQ("aa", value); s = txn->Get(read_options, "F", &value); // According to db.h, doing a SingleDelete on a key that has been // overwritten will have undefinied behavior. So it is unclear what the // result of fetching "F" should be. The current implementation will // return NotFound in this case. ASSERT_TRUE(s.IsNotFound()); s = txn->Get(read_options, "B", &value); ASSERT_TRUE(s.IsNotFound()); ASSERT_EQ(3, txn->GetNumPuts()); ASSERT_EQ(2, txn->GetNumDeletes()); ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3 ASSERT_EQ(2, txn->GetNumPuts()); ASSERT_EQ(0, txn->GetNumDeletes()); s = txn->Get(read_options, "F", &value); ASSERT_OK(s); ASSERT_EQ("f", value); s = txn->Get(read_options, "G", &value); ASSERT_TRUE(s.IsNotFound()); s = txn->Commit(); ASSERT_OK(s); s = db->Get(read_options, "F", &value); ASSERT_OK(s); ASSERT_EQ("f", value); s = db->Get(read_options, "G", &value); ASSERT_TRUE(s.IsNotFound()); s = db->Get(read_options, "A", &value); ASSERT_OK(s); ASSERT_EQ("aa", value); s = db->Get(read_options, "B", &value); ASSERT_OK(s); ASSERT_EQ("b", value); s = db->Get(read_options, "C", &value); ASSERT_TRUE(s.IsNotFound()); s = db->Get(read_options, "D", &value); ASSERT_TRUE(s.IsNotFound()); s = db->Get(read_options, "E", &value); ASSERT_TRUE(s.IsNotFound()); delete txn; } TEST_F(TransactionTest, SavepointTest2) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; TransactionOptions txn_options; string value; Status s; txn_options.lock_timeout = 1; // 1 ms Transaction* txn1 = db->BeginTransaction(write_options, txn_options); ASSERT_TRUE(txn1); s = txn1->Put("A", ""); ASSERT_OK(s); txn1->SetSavePoint(); // 1 s = txn1->Put("A", "a"); ASSERT_OK(s); s = txn1->Put("C", "c"); ASSERT_OK(s); txn1->SetSavePoint(); // 2 s = txn1->Put("A", "a"); ASSERT_OK(s); s = txn1->Put("B", "b"); ASSERT_OK(s); ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 2 // Verify that "A" and "C" is still locked while "B" is not Transaction* txn2 = db->BeginTransaction(write_options, txn_options); ASSERT_TRUE(txn2); s = txn2->Put("A", "a2"); ASSERT_TRUE(s.IsTimedOut()); s = txn2->Put("C", "c2"); ASSERT_TRUE(s.IsTimedOut()); s = txn2->Put("B", "b2"); ASSERT_OK(s); s = txn1->Put("A", "aa"); ASSERT_OK(s); s = txn1->Put("B", "bb"); ASSERT_TRUE(s.IsTimedOut()); s = txn2->Commit(); ASSERT_OK(s); delete txn2; s = txn1->Put("A", "aaa"); ASSERT_OK(s); s = txn1->Put("B", "bbb"); ASSERT_OK(s); s = txn1->Put("C", "ccc"); ASSERT_OK(s); txn1->SetSavePoint(); // 3 ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 3 // Verify that "A", "B", "C" are still locked txn2 = db->BeginTransaction(write_options, txn_options); ASSERT_TRUE(txn2); s = txn2->Put("A", "a2"); ASSERT_TRUE(s.IsTimedOut()); s = txn2->Put("B", "b2"); ASSERT_TRUE(s.IsTimedOut()); s = txn2->Put("C", "c2"); ASSERT_TRUE(s.IsTimedOut()); ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1 // Verify that only "A" is locked s = txn2->Put("A", "a3"); ASSERT_TRUE(s.IsTimedOut()); s = txn2->Put("B", "b3"); ASSERT_OK(s); s = txn2->Put("C", "c3po"); ASSERT_OK(s); s = txn1->Commit(); ASSERT_OK(s); delete txn1; // Verify "A" "C" "B" are no longer locked s = txn2->Put("A", "a4"); ASSERT_OK(s); s = txn2->Put("B", "b4"); ASSERT_OK(s); s = txn2->Put("C", "c4"); ASSERT_OK(s); s = txn2->Commit(); ASSERT_OK(s); delete txn2; } TEST_F(TransactionTest, TimeoutTest) { WriteOptions write_options; ReadOptions read_options; string value; Status s; delete db; // transaction writes have an infinite timeout, // but we will override this when we start a txn // db writes have infinite timeout txn_db_options.transaction_lock_timeout = -1; txn_db_options.default_lock_timeout = -1; s = TransactionDB::Open(options, txn_db_options, dbname, &db); ASSERT_OK(s); s = db->Put(write_options, "aaa", "aaa"); ASSERT_OK(s); TransactionOptions txn_options0; txn_options0.expiration = 100; // 100ms txn_options0.lock_timeout = 50; // txn timeout no longer infinite Transaction* txn1 = db->BeginTransaction(write_options, txn_options0); s = txn1->GetForUpdate(read_options, "aaa", nullptr); ASSERT_OK(s); // Conflicts with previous GetForUpdate. // Since db writes do not have a timeout, this should eventually succeed when // the transaction expires. s = db->Put(write_options, "aaa", "xxx"); ASSERT_OK(s); ASSERT_GE(txn1->GetElapsedTime(), static_cast(txn_options0.expiration)); s = txn1->Commit(); ASSERT_TRUE(s.IsExpired()); // expired! s = db->Get(read_options, "aaa", &value); ASSERT_OK(s); ASSERT_EQ("xxx", value); delete txn1; delete db; // transaction writes have 10ms timeout, // db writes have infinite timeout txn_db_options.transaction_lock_timeout = 50; txn_db_options.default_lock_timeout = -1; s = TransactionDB::Open(options, txn_db_options, dbname, &db); ASSERT_OK(s); s = db->Put(write_options, "aaa", "aaa"); ASSERT_OK(s); TransactionOptions txn_options; txn_options.expiration = 100; // 100ms txn1 = db->BeginTransaction(write_options, txn_options); s = txn1->GetForUpdate(read_options, "aaa", nullptr); ASSERT_OK(s); // Conflicts with previous GetForUpdate. // Since db writes do not have a timeout, this should eventually succeed when // the transaction expires. s = db->Put(write_options, "aaa", "xxx"); ASSERT_OK(s); s = txn1->Commit(); ASSERT_NOK(s); // expired! s = db->Get(read_options, "aaa", &value); ASSERT_OK(s); ASSERT_EQ("xxx", value); delete txn1; txn_options.expiration = 6000000; // 100 minutes txn_options.lock_timeout = 1; // 1ms txn1 = db->BeginTransaction(write_options, txn_options); txn1->SetLockTimeout(100); TransactionOptions txn_options2; txn_options2.expiration = 10; // 10ms Transaction* txn2 = db->BeginTransaction(write_options, txn_options2); ASSERT_OK(s); s = txn2->Put("a", "2"); ASSERT_OK(s); // txn1 has a lock timeout longer than txn2's expiration, so it will win s = txn1->Delete("a"); ASSERT_OK(s); s = txn1->Commit(); ASSERT_OK(s); // txn2 should be expired out since txn1 waiting until its timeout expired. s = txn2->Commit(); ASSERT_TRUE(s.IsExpired()); delete txn1; delete txn2; txn_options.expiration = 6000000; // 100 minutes txn1 = db->BeginTransaction(write_options, txn_options); txn_options2.expiration = 100000000; txn2 = db->BeginTransaction(write_options, txn_options2); s = txn1->Delete("asdf"); ASSERT_OK(s); // txn2 has a smaller lock timeout than txn1's expiration, so it will time out s = txn2->Delete("asdf"); ASSERT_TRUE(s.IsTimedOut()); ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); s = txn1->Commit(); ASSERT_OK(s); s = txn2->Put("asdf", "asdf"); ASSERT_OK(s); s = txn2->Commit(); ASSERT_OK(s); s = db->Get(read_options, "asdf", &value); ASSERT_OK(s); ASSERT_EQ("asdf", value); delete txn1; delete txn2; } TEST_F(TransactionTest, SingleDeleteTest) { WriteOptions write_options; ReadOptions read_options; string value; Status s; Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); s = txn->SingleDelete("A"); ASSERT_OK(s); s = txn->Get(read_options, "A", &value); ASSERT_TRUE(s.IsNotFound()); s = txn->Commit(); ASSERT_OK(s); delete txn; txn = db->BeginTransaction(write_options); s = txn->SingleDelete("A"); ASSERT_OK(s); s = txn->Put("A", "a"); ASSERT_OK(s); s = txn->Get(read_options, "A", &value); ASSERT_OK(s); ASSERT_EQ("a", value); s = txn->Commit(); ASSERT_OK(s); delete txn; s = db->Get(read_options, "A", &value); ASSERT_OK(s); ASSERT_EQ("a", value); txn = db->BeginTransaction(write_options); s = txn->SingleDelete("A"); ASSERT_OK(s); s = txn->Get(read_options, "A", &value); ASSERT_TRUE(s.IsNotFound()); s = txn->Commit(); ASSERT_OK(s); delete txn; s = db->Get(read_options, "A", &value); ASSERT_TRUE(s.IsNotFound()); txn = db->BeginTransaction(write_options); Transaction* txn2 = db->BeginTransaction(write_options); txn2->SetSnapshot(); s = txn->Put("A", "a"); ASSERT_OK(s); s = txn->Put("A", "a2"); ASSERT_OK(s); s = txn->SingleDelete("A"); ASSERT_OK(s); s = txn->SingleDelete("B"); ASSERT_OK(s); // According to db.h, doing a SingleDelete on a key that has been // overwritten will have undefinied behavior. So it is unclear what the // result of fetching "A" should be. The current implementation will // return NotFound in this case. s = txn->Get(read_options, "A", &value); ASSERT_TRUE(s.IsNotFound()); s = txn2->Put("B", "b"); ASSERT_TRUE(s.IsTimedOut()); s = txn2->Commit(); ASSERT_OK(s); delete txn2; s = txn->Commit(); ASSERT_OK(s); delete txn; // According to db.h, doing a SingleDelete on a key that has been // overwritten will have undefinied behavior. So it is unclear what the // result of fetching "A" should be. The current implementation will // return NotFound in this case. s = db->Get(read_options, "A", &value); ASSERT_TRUE(s.IsNotFound()); s = db->Get(read_options, "B", &value); ASSERT_TRUE(s.IsNotFound()); } TEST_F(TransactionTest, MergeTest) { WriteOptions write_options; ReadOptions read_options; string value; Status s; Transaction* txn = db->BeginTransaction(write_options, TransactionOptions()); ASSERT_TRUE(txn); s = db->Put(write_options, "A", "a0"); ASSERT_OK(s); s = txn->Merge("A", "1"); ASSERT_OK(s); s = txn->Merge("A", "2"); ASSERT_OK(s); s = txn->Get(read_options, "A", &value); ASSERT_TRUE(s.IsMergeInProgress()); s = txn->Put("A", "a"); ASSERT_OK(s); s = txn->Get(read_options, "A", &value); ASSERT_OK(s); ASSERT_EQ("a", value); s = txn->Merge("A", "3"); ASSERT_OK(s); s = txn->Get(read_options, "A", &value); ASSERT_TRUE(s.IsMergeInProgress()); TransactionOptions txn_options; txn_options.lock_timeout = 1; // 1 ms Transaction* txn2 = db->BeginTransaction(write_options, txn_options); ASSERT_TRUE(txn2); // verify that txn has "A" locked s = txn2->Merge("A", "4"); ASSERT_TRUE(s.IsTimedOut()); s = txn2->Commit(); ASSERT_OK(s); delete txn2; s = txn->Commit(); ASSERT_OK(s); delete txn; s = db->Get(read_options, "A", &value); ASSERT_OK(s); ASSERT_EQ("a,3", value); } TEST_F(TransactionTest, DeferSnapshotTest) { WriteOptions write_options; ReadOptions read_options; string value; Status s; s = db->Put(write_options, "A", "a0"); ASSERT_OK(s); Transaction* txn1 = db->BeginTransaction(write_options); Transaction* txn2 = db->BeginTransaction(write_options); txn1->SetSnapshotOnNextOperation(); auto snapshot = txn1->GetSnapshot(); ASSERT_FALSE(snapshot); s = txn2->Put("A", "a2"); ASSERT_OK(s); s = txn2->Commit(); ASSERT_OK(s); delete txn2; s = txn1->GetForUpdate(read_options, "A", &value); // Should not conflict with txn2 since snapshot wasn't set until // GetForUpdate was called. ASSERT_OK(s); ASSERT_EQ("a2", value); s = txn1->Put("A", "a1"); ASSERT_OK(s); s = db->Put(write_options, "B", "b0"); ASSERT_OK(s); // Cannot lock B since it was written after the snapshot was set s = txn1->Put("B", "b1"); ASSERT_TRUE(s.IsBusy()); s = txn1->Commit(); ASSERT_OK(s); delete txn1; s = db->Get(read_options, "A", &value); ASSERT_OK(s); ASSERT_EQ("a1", value); s = db->Get(read_options, "B", &value); ASSERT_OK(s); ASSERT_EQ("b0", value); } TEST_F(TransactionTest, DeferSnapshotTest2) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; string value; Status s; Transaction* txn1 = db->BeginTransaction(write_options); txn1->SetSnapshot(); s = txn1->Put("A", "a1"); ASSERT_OK(s); s = db->Put(write_options, "C", "c0"); ASSERT_OK(s); s = db->Put(write_options, "D", "d0"); ASSERT_OK(s); snapshot_read_options.snapshot = txn1->GetSnapshot(); txn1->SetSnapshotOnNextOperation(); s = txn1->Get(snapshot_read_options, "C", &value); // Snapshot was set before C was written ASSERT_TRUE(s.IsNotFound()); s = txn1->Get(snapshot_read_options, "D", &value); // Snapshot was set before D was written ASSERT_TRUE(s.IsNotFound()); // Snapshot should not have changed yet. snapshot_read_options.snapshot = txn1->GetSnapshot(); s = txn1->Get(snapshot_read_options, "C", &value); // Snapshot was set before C was written ASSERT_TRUE(s.IsNotFound()); s = txn1->Get(snapshot_read_options, "D", &value); // Snapshot was set before D was written ASSERT_TRUE(s.IsNotFound()); s = txn1->GetForUpdate(read_options, "C", &value); ASSERT_OK(s); ASSERT_EQ("c0", value); s = db->Put(write_options, "D", "d00"); ASSERT_OK(s); // Snapshot is now set snapshot_read_options.snapshot = txn1->GetSnapshot(); s = txn1->Get(snapshot_read_options, "D", &value); ASSERT_OK(s); ASSERT_EQ("d0", value); s = txn1->Commit(); ASSERT_OK(s); delete txn1; } TEST_F(TransactionTest, DeferSnapshotSavePointTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; string value; Status s; Transaction* txn1 = db->BeginTransaction(write_options); txn1->SetSavePoint(); // 1 s = db->Put(write_options, "T", "1"); ASSERT_OK(s); txn1->SetSnapshotOnNextOperation(); s = db->Put(write_options, "T", "2"); ASSERT_OK(s); txn1->SetSavePoint(); // 2 s = db->Put(write_options, "T", "3"); ASSERT_OK(s); s = txn1->Put("A", "a"); ASSERT_OK(s); txn1->SetSavePoint(); // 3 s = db->Put(write_options, "T", "4"); ASSERT_OK(s); txn1->SetSnapshot(); txn1->SetSnapshotOnNextOperation(); txn1->SetSavePoint(); // 4 s = db->Put(write_options, "T", "5"); ASSERT_OK(s); snapshot_read_options.snapshot = txn1->GetSnapshot(); s = txn1->Get(snapshot_read_options, "T", &value); ASSERT_OK(s); ASSERT_EQ("4", value); s = txn1->Put("A", "a1"); ASSERT_OK(s); snapshot_read_options.snapshot = txn1->GetSnapshot(); s = txn1->Get(snapshot_read_options, "T", &value); ASSERT_OK(s); ASSERT_EQ("5", value); s = txn1->RollbackToSavePoint(); // Rollback to 4 ASSERT_OK(s); snapshot_read_options.snapshot = txn1->GetSnapshot(); s = txn1->Get(snapshot_read_options, "T", &value); ASSERT_OK(s); ASSERT_EQ("4", value); s = txn1->RollbackToSavePoint(); // Rollback to 3 ASSERT_OK(s); snapshot_read_options.snapshot = txn1->GetSnapshot(); s = txn1->Get(snapshot_read_options, "T", &value); ASSERT_OK(s); ASSERT_EQ("3", value); s = txn1->Get(read_options, "T", &value); ASSERT_OK(s); ASSERT_EQ("5", value); s = txn1->RollbackToSavePoint(); // Rollback to 2 ASSERT_OK(s); snapshot_read_options.snapshot = txn1->GetSnapshot(); ASSERT_FALSE(snapshot_read_options.snapshot); s = txn1->Get(snapshot_read_options, "T", &value); ASSERT_OK(s); ASSERT_EQ("5", value); s = txn1->Delete("A"); ASSERT_OK(s); snapshot_read_options.snapshot = txn1->GetSnapshot(); ASSERT_TRUE(snapshot_read_options.snapshot); s = txn1->Get(snapshot_read_options, "T", &value); ASSERT_OK(s); ASSERT_EQ("5", value); s = txn1->RollbackToSavePoint(); // Rollback to 1 ASSERT_OK(s); s = txn1->Delete("A"); ASSERT_OK(s); snapshot_read_options.snapshot = txn1->GetSnapshot(); ASSERT_FALSE(snapshot_read_options.snapshot); s = txn1->Get(snapshot_read_options, "T", &value); ASSERT_OK(s); ASSERT_EQ("5", value); s = txn1->Commit(); ASSERT_OK(s); delete txn1; } TEST_F(TransactionTest, SetSnapshotOnNextOperationWithNotification) { WriteOptions write_options; ReadOptions read_options; string value; class Notifier : public TransactionNotifier { private: const Snapshot** snapshot_ptr_; public: explicit Notifier(const Snapshot** snapshot_ptr) : snapshot_ptr_(snapshot_ptr) {} void SnapshotCreated(const Snapshot* newSnapshot) { *snapshot_ptr_ = newSnapshot; } }; std::shared_ptr notifier = std::make_shared(&read_options.snapshot); Status s; s = db->Put(write_options, "B", "0"); ASSERT_OK(s); Transaction* txn1 = db->BeginTransaction(write_options); txn1->SetSnapshotOnNextOperation(notifier); ASSERT_FALSE(read_options.snapshot); s = db->Put(write_options, "B", "1"); ASSERT_OK(s); // A Get does not generate the snapshot s = txn1->Get(read_options, "B", &value); ASSERT_OK(s); ASSERT_FALSE(read_options.snapshot); ASSERT_EQ(value, "1"); // Any other operation does s = txn1->Put("A", "0"); ASSERT_OK(s); // Now change "B". s = db->Put(write_options, "B", "2"); ASSERT_OK(s); // The original value should still be read s = txn1->Get(read_options, "B", &value); ASSERT_OK(s); ASSERT_TRUE(read_options.snapshot); ASSERT_EQ(value, "1"); s = txn1->Commit(); ASSERT_OK(s); delete txn1; } TEST_F(TransactionTest, ClearSnapshotTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; string value; Status s; s = db->Put(write_options, "foo", "0"); ASSERT_OK(s); Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); s = db->Put(write_options, "foo", "1"); ASSERT_OK(s); snapshot_read_options.snapshot = txn->GetSnapshot(); ASSERT_FALSE(snapshot_read_options.snapshot); // No snapshot created yet s = txn->Get(snapshot_read_options, "foo", &value); ASSERT_EQ(value, "1"); txn->SetSnapshot(); snapshot_read_options.snapshot = txn->GetSnapshot(); ASSERT_TRUE(snapshot_read_options.snapshot); s = db->Put(write_options, "foo", "2"); ASSERT_OK(s); // Snapshot was created before change to '2' s = txn->Get(snapshot_read_options, "foo", &value); ASSERT_EQ(value, "1"); txn->ClearSnapshot(); snapshot_read_options.snapshot = txn->GetSnapshot(); ASSERT_FALSE(snapshot_read_options.snapshot); // Snapshot has now been cleared s = txn->Get(snapshot_read_options, "foo", &value); ASSERT_EQ(value, "2"); s = txn->Commit(); ASSERT_OK(s); delete txn; } TEST_F(TransactionTest, ToggleAutoCompactionTest) { Status s; TransactionOptions txn_options; ColumnFamilyHandle *cfa, *cfb; ColumnFamilyOptions cf_options; // Create 2 new column families s = db->CreateColumnFamily(cf_options, "CFA", &cfa); ASSERT_OK(s); s = db->CreateColumnFamily(cf_options, "CFB", &cfb); ASSERT_OK(s); delete cfa; delete cfb; delete db; // open DB with three column families std::vector column_families; // have to open default column family column_families.push_back( ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions())); // open the new column families column_families.push_back( ColumnFamilyDescriptor("CFA", ColumnFamilyOptions())); column_families.push_back( ColumnFamilyDescriptor("CFB", ColumnFamilyOptions())); ColumnFamilyOptions* cf_opt_default = &column_families[0].options; ColumnFamilyOptions* cf_opt_cfa = &column_families[1].options; ColumnFamilyOptions* cf_opt_cfb = &column_families[2].options; cf_opt_default->disable_auto_compactions = false; cf_opt_cfa->disable_auto_compactions = true; cf_opt_cfb->disable_auto_compactions = false; std::vector handles; s = TransactionDB::Open(options, txn_db_options, dbname, column_families, &handles, &db); ASSERT_OK(s); auto cfh_default = reinterpret_cast(handles[0]); auto opt_default = *cfh_default->cfd()->GetLatestMutableCFOptions(); auto cfh_a = reinterpret_cast(handles[1]); auto opt_a = *cfh_a->cfd()->GetLatestMutableCFOptions(); auto cfh_b = reinterpret_cast(handles[2]); auto opt_b = *cfh_b->cfd()->GetLatestMutableCFOptions(); ASSERT_EQ(opt_default.disable_auto_compactions, false); ASSERT_EQ(opt_a.disable_auto_compactions, true); ASSERT_EQ(opt_b.disable_auto_compactions, false); for (auto handle : handles) { delete handle; } } } // namespace rocksdb int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } #else #include int main(int argc, char** argv) { fprintf(stderr, "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n"); return 0; } #endif // ROCKSDB_LITE