Fix unchecked statuses for transaction_test (#7572)

Summary:
When `ASSERT_STATUS_CHECKED` is enabled, `transaction_test` does not pass without this PR.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7572

Test Plan: `ASSERT_STATUS_CHECKED=1 make   -j32 transaction_test && ./transaction_test`

Reviewed By: zhichao-cao

Differential Revision: D24404319

Pulled By: cheng-chang

fbshipit-source-id: 13689035995366ab06d8eada3ea404e45fef8bc5
main
Cheng Chang 4 years ago committed by Facebook GitHub Bot
parent 73dbe10bbf
commit 5227b315ec
  1. 1
      test_util/transaction_test_util.cc
  2. 6
      utilities/transactions/lock/point/point_lock_manager.cc
  3. 38
      utilities/transactions/pessimistic_transaction.cc
  4. 4
      utilities/transactions/transaction_base.cc
  5. 3
      utilities/transactions/transaction_base.h
  6. 270
      utilities/transactions/transaction_test.cc
  7. 40
      utilities/transactions/write_prepared_txn.cc
  8. 9
      utilities/transactions/write_prepared_txn_db.cc
  9. 46
      utilities/transactions/write_unprepared_txn.cc

@ -349,6 +349,7 @@ Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets,
static_cast<int>(key.size()), key.data(), int_value);
total += int_value;
}
iter->status().PermitUncheckedError();
delete iter;
}

@ -635,7 +635,7 @@ void PointLockManager::UnLock(PessimisticTransaction* txn,
assert(lock_map->lock_map_stripes_.size() > stripe_num);
LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
stripe->stripe_mutex->Lock();
stripe->stripe_mutex->Lock().PermitUncheckedError();
UnLockKey(txn, key, stripe, lock_map, env);
stripe->stripe_mutex->UnLock();
@ -677,7 +677,7 @@ void PointLockManager::UnLock(PessimisticTransaction* txn,
assert(lock_map->lock_map_stripes_.size() > stripe_num);
LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
stripe->stripe_mutex->Lock();
stripe->stripe_mutex->Lock().PermitUncheckedError();
for (const std::string* key : stripe_keys) {
UnLockKey(txn, *key, stripe, lock_map, env);
@ -708,7 +708,7 @@ PointLockManager::PointLockStatus PointLockManager::GetPointLockStatus() {
const auto& stripes = lock_maps_[i]->lock_map_stripes_;
// Iterate and lock all stripes in ascending order.
for (const auto& j : stripes) {
j->stripe_mutex->Lock();
j->stripe_mutex->Lock().PermitUncheckedError();
for (const auto& it : j->keys) {
struct KeyLockInfo info;
info.exclusive = it.second.exclusive;

@ -173,7 +173,6 @@ Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
}
Status PessimisticTransaction::Prepare() {
Status s;
if (name_.empty()) {
return Status::InvalidArgument(
@ -184,6 +183,7 @@ Status PessimisticTransaction::Prepare() {
return Status::Expired();
}
Status s;
bool can_prepare = false;
if (expiration_time_ > 0) {
@ -226,7 +226,9 @@ Status PessimisticTransaction::Prepare() {
Status WriteCommittedTxn::PrepareInternal() {
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
name_);
assert(s.ok());
class MarkLogCallback : public PreReleaseCallback {
public:
MarkLogCallback(DBImpl* db, bool two_write_queues)
@ -256,15 +258,14 @@ Status WriteCommittedTxn::PrepareInternal() {
const bool kDisableMemtable = true;
SequenceNumber* const KIgnoreSeqUsed = nullptr;
const size_t kNoBatchCount = 0;
Status s = db_impl_->WriteImpl(
write_options, GetWriteBatch()->GetWriteBatch(), kNoWriteCallback,
&log_number_, kRefNoLog, kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount,
&mark_log_callback);
s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
kNoWriteCallback, &log_number_, kRefNoLog,
kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount,
&mark_log_callback);
return s;
}
Status PessimisticTransaction::Commit() {
Status s;
bool commit_without_prepare = false;
bool commit_prepared = false;
@ -294,6 +295,7 @@ Status PessimisticTransaction::Commit() {
}
}
Status s;
if (commit_without_prepare) {
assert(!commit_prepared);
if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
@ -377,7 +379,8 @@ Status WriteCommittedTxn::CommitInternal() {
// We take the commit-time batch and append the Commit marker.
// The Memtable will ignore the Commit marker in non-recovery mode
WriteBatch* working_batch = GetCommitTimeWriteBatch();
WriteBatchInternal::MarkCommit(working_batch, name_);
auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
assert(s.ok());
// any operations appended to this working_batch will be ignored from WAL
working_batch->MarkWalTerminationPoint();
@ -385,11 +388,12 @@ Status WriteCommittedTxn::CommitInternal() {
// insert prepared batch into Memtable only skipping WAL.
// Memtable will ignore BeginPrepare/EndPrepare markers
// in non recovery mode and simply insert the values
WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch());
s = WriteBatchInternal::Append(working_batch,
GetWriteBatch()->GetWriteBatch());
assert(s.ok());
uint64_t seq_used = kMaxSequenceNumber;
auto s =
db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
/*log_used*/ nullptr, /*log_ref*/ log_number_,
/*disable_memtable*/ false, &seq_used);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
@ -439,8 +443,9 @@ Status PessimisticTransaction::Rollback() {
Status WriteCommittedTxn::RollbackInternal() {
WriteBatch rollback_marker;
WriteBatchInternal::MarkRollback(&rollback_marker, name_);
auto s = db_impl_->WriteImpl(write_options_, &rollback_marker);
auto s = WriteBatchInternal::MarkRollback(&rollback_marker, name_);
assert(s.ok());
s = db_impl_->WriteImpl(write_options_, &rollback_marker);
return s;
}
@ -505,9 +510,10 @@ Status PessimisticTransaction::LockBatch(WriteBatch* batch,
// Iterating on this handler will add all keys in this batch into keys
Handler handler;
batch->Iterate(&handler);
Status s;
Status s = batch->Iterate(&handler);
if (!s.ok()) {
return s;
}
// Attempt to lock all keys
for (const auto& cf_iter : handler.keys_) {

@ -530,7 +530,9 @@ Status TransactionBaseImpl::SingleDeleteUntracked(
}
void TransactionBaseImpl::PutLogData(const Slice& blob) {
write_batch_.PutLogData(blob);
auto s = write_batch_.PutLogData(blob);
(void)s;
assert(s.ok());
}
WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() {

@ -271,7 +271,8 @@ class TransactionBaseImpl : public Transaction {
write_batch_.Clear();
}
assert(write_batch_.GetDataSize() == WriteBatchInternal::kHeader);
WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
auto s = WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
assert(s.ok());
}
DB* db_;

@ -198,7 +198,7 @@ TEST_P(TransactionTest, AssumeExclusiveTracked) {
ASSERT_OK(txn->SingleDelete(db->DefaultColumnFamily(), Slice("foo"),
ASSUME_LOCKED));
txn->Rollback();
ASSERT_OK(txn->Rollback());
delete txn;
}
@ -223,7 +223,7 @@ TEST_P(TransactionTest, ValidateSnapshotTest) {
if (with_flush) {
auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
db_impl->TEST_FlushMemTable(true);
ASSERT_OK(db_impl->TEST_FlushMemTable(true));
// Make sure the flushed memtable is not kept in memory
int max_memtable_in_history =
std::max(
@ -232,8 +232,8 @@ TEST_P(TransactionTest, ValidateSnapshotTest) {
static_cast<int>(options.write_buffer_size)) +
1;
for (int i = 0; i < max_memtable_in_history; i++) {
db->Put(write_options, Slice("key"), Slice("value"));
db_impl->TEST_FlushMemTable(true);
ASSERT_OK(db->Put(write_options, Slice("key"), Slice("value")));
ASSERT_OK(db_impl->TEST_FlushMemTable(true));
}
}
@ -392,9 +392,9 @@ TEST_P(TransactionTest, SharedLocks) {
ASSERT_EQ(expected_txns, lock_txns);
ASSERT_FALSE(cf_iterator->second.exclusive);
txn1->Rollback();
txn2->Rollback();
txn3->Rollback();
ASSERT_OK(txn1->Rollback());
ASSERT_OK(txn2->Rollback());
ASSERT_OK(txn3->Rollback());
// Test txn1 and txn2 sharing a lock and txn3 trying to obtain it.
s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
@ -416,9 +416,9 @@ TEST_P(TransactionTest, SharedLocks) {
s = txn3->GetForUpdate(read_options, "foo", nullptr);
ASSERT_OK(s);
txn1->Rollback();
txn2->Rollback();
txn3->Rollback();
ASSERT_OK(txn1->Rollback());
ASSERT_OK(txn2->Rollback());
ASSERT_OK(txn3->Rollback());
// Test txn1 and txn2 sharing a lock and txn2 trying to upgrade lock.
s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
@ -454,8 +454,8 @@ TEST_P(TransactionTest, SharedLocks) {
ASSERT_TRUE(s.IsTimedOut());
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
txn1->Rollback();
txn2->Rollback();
ASSERT_OK(txn1->Rollback());
ASSERT_OK(txn2->Rollback());
// Test txn1 holding an exclusive lock and txn2 trying to obtain shared
// access.
@ -519,7 +519,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) {
auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr,
true /* exclusive */);
ASSERT_OK(s);
txns[i]->Rollback();
ASSERT_OK(txns[i]->Rollback());
delete txns[i];
};
threads.emplace_back(blocking_thread);
@ -581,7 +581,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) {
// Rollback the leaf transaction.
for (uint32_t i = 15; i < 31; i++) {
txns[i]->Rollback();
ASSERT_OK(txns[i]->Rollback());
delete txns[i];
}
@ -651,7 +651,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) {
auto s =
txns_shared[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
ASSERT_OK(s);
txns_shared[i]->Rollback();
ASSERT_OK(txns_shared[i]->Rollback());
delete txns_shared[i];
};
threads_shared.emplace_back(blocking_thread);
@ -678,7 +678,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) {
// Verify the exclusivity field of the transactions in the deadlock path.
ASSERT_TRUE(dlock_buffer[0].path[0].m_exclusive);
ASSERT_FALSE(dlock_buffer[0].path[1].m_exclusive);
txns_shared[1]->Rollback();
ASSERT_OK(txns_shared[1]->Rollback());
delete txns_shared[1];
for (auto& t : threads_shared) {
@ -725,7 +725,7 @@ TEST_P(TransactionStressTest, DeadlockCycle) {
std::function<void()> blocking_thread = [&, i] {
auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
ASSERT_OK(s);
txns[i]->Rollback();
ASSERT_OK(txns[i]->Rollback());
delete txns[i];
};
threads.emplace_back(blocking_thread);
@ -786,7 +786,7 @@ TEST_P(TransactionStressTest, DeadlockCycle) {
}
// Rollback the last transaction.
txns[len - 1]->Rollback();
ASSERT_OK(txns[len - 1]->Rollback());
delete txns[len - 1];
for (auto& t : threads) {
@ -809,7 +809,7 @@ TEST_P(TransactionStressTest, DeadlockStress) {
std::vector<std::string> keys;
for (uint32_t i = 0; i < NUM_KEYS; i++) {
db->Put(write_options, Slice(ToString(i)), Slice(""));
ASSERT_OK(db->Put(write_options, Slice(ToString(i)), Slice("")));
keys.push_back(ToString(i));
}
@ -831,7 +831,7 @@ TEST_P(TransactionStressTest, DeadlockStress) {
txn->GetForUpdate(read_options, k, nullptr, txn->GetID() % 4 == 0);
if (!s.ok()) {
ASSERT_TRUE(s.IsDeadlock());
txn->Rollback();
ASSERT_OK(txn->Rollback());
break;
}
}
@ -896,7 +896,7 @@ TEST_P(TransactionTest, LogMarkLeakTest) {
ASSERT_OK(txn->Commit());
delete txn;
}
db_impl->TEST_FlushMemTable(true);
ASSERT_OK(db_impl->TEST_FlushMemTable(true));
}
for (auto txn : txns) {
ASSERT_OK(txn->Commit());
@ -941,12 +941,14 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
ASSERT_EQ(1, txn->GetNumPuts());
// regular db read
db->Get(read_options, "foo2", &value);
ASSERT_OK(db->Get(read_options, "foo2", &value));
ASSERT_EQ(value, "bar2");
// commit time put
txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs"));
txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats"));
ASSERT_OK(
txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs")));
ASSERT_OK(
txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats")));
// nothing has been prepped yet
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
@ -1017,7 +1019,7 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
assert(false);
}
db_impl->TEST_FlushMemTable(true);
ASSERT_OK(db_impl->TEST_FlushMemTable(true));
// After flush the recoverable state must be visible
if (cwb4recovery) {
s = db->Get(read_options, "gtid", &value);
@ -1104,8 +1106,8 @@ TEST_P(TransactionTest, TwoPhaseNameTest) {
s = txn1->SetName("name4");
ASSERT_EQ(s, Status::InvalidArgument());
txn1->Rollback();
txn2->Rollback();
ASSERT_OK(txn1->Rollback());
ASSERT_OK(txn2->Rollback());
delete txn1;
delete txn2;
}
@ -1144,7 +1146,8 @@ TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) {
delete txn1;
txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar"));
ASSERT_OK(
txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar")));
s = txn2->Prepare();
ASSERT_OK(s);
@ -1160,13 +1163,13 @@ TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) {
} else {
if (test_with_empty_wal) {
DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
db_impl->TEST_FlushMemTable(true);
ASSERT_OK(db_impl->TEST_FlushMemTable(true));
// After flush the state must be visible
s = db->Get(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar");
}
db->FlushWAL(true);
ASSERT_OK(db->FlushWAL(true));
// kill and reopen to trigger recovery
s = ReOpenNoDelete();
ASSERT_OK(s);
@ -1259,7 +1262,7 @@ TEST_P(TransactionTest, TwoPhaseRollbackTest) {
// flush to next wal
s = db->Put(write_options, Slice("foo"), Slice("bar"));
ASSERT_OK(s);
db_impl->TEST_FlushMemTable(true);
ASSERT_OK(db_impl->TEST_FlushMemTable(true));
// issue rollback (marker written to WAL)
s = txn->Rollback();
@ -1315,7 +1318,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
ASSERT_OK(s);
ASSERT_EQ(1, txn->GetNumPuts());
db_impl->TEST_FlushMemTable(true);
ASSERT_OK(db_impl->TEST_FlushMemTable(true));
// regular db read
db->Get(read_options, "foo2", &value);
@ -1332,7 +1335,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
s = db->Get(read_options, Slice("foo"), &value);
ASSERT_TRUE(s.IsNotFound());
db->FlushWAL(false);
ASSERT_OK(db->FlushWAL(false));
delete txn;
// kill and reopen
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
@ -1406,7 +1409,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
s = db->Put(write_options, Slice("foo3"), Slice("bar3"));
ASSERT_OK(s);
db_impl->TEST_FlushMemTable(true);
ASSERT_OK(db_impl->TEST_FlushMemTable(true));
// after memtable flush we can now release the log
ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
@ -1672,7 +1675,7 @@ TEST_P(TransactionTest, TwoPhaseDoubleRecoveryTest) {
// kill and reopen
env->SetFilesystemActive(false);
ReOpenNoDelete();
ASSERT_OK(ReOpenNoDelete());
assert(db != nullptr);
// value is now available
@ -2020,12 +2023,12 @@ TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) {
s = db->Put(wal_on, "cats", "dogs4");
ASSERT_OK(s);
db->FlushWAL(false);
ASSERT_OK(db->FlushWAL(false));
// kill and reopen
env->SetFilesystemActive(false);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ReOpenNoDelete();
ASSERT_OK(ReOpenNoDelete());
assert(db != nullptr);
s = db->Get(read_options, "first", &value);
@ -2098,8 +2101,8 @@ TEST_P(TransactionTest, WriteConflictTest) {
string value;
Status s;
db->Put(write_options, "foo", "A");
db->Put(write_options, "foo2", "B");
ASSERT_OK(db->Put(write_options, "foo", "A"));
ASSERT_OK(db->Put(write_options, "foo2", "B"));
Transaction* txn = db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
@ -2135,7 +2138,7 @@ TEST_P(TransactionTest, WriteConflictTest2) {
std::string value;
Status s;
db->Put(write_options, "foo", "bar");
ASSERT_OK(db->Put(write_options, "foo", "bar"));
txn_options.set_snapshot = true;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
@ -2183,8 +2186,8 @@ TEST_P(TransactionTest, ReadConflictTest) {
std::string value;
Status s;
db->Put(write_options, "foo", "bar");
db->Put(write_options, "foo2", "bar");
ASSERT_OK(db->Put(write_options, "foo", "bar"));
ASSERT_OK(db->Put(write_options, "foo2", "bar"));
txn_options.set_snapshot = true;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
@ -2193,7 +2196,7 @@ TEST_P(TransactionTest, ReadConflictTest) {
txn->SetSnapshot();
snapshot_read_options.snapshot = txn->GetSnapshot();
txn->GetForUpdate(snapshot_read_options, "foo", &value);
ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
ASSERT_EQ(value, "bar");
// This Put outside of a transaction will conflict with the previous read
@ -2239,21 +2242,21 @@ TEST_P(TransactionTest, FlushTest) {
std::string value;
Status s;
db->Put(write_options, Slice("foo"), Slice("bar"));
db->Put(write_options, Slice("foo2"), Slice("bar"));
ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
Transaction* txn = db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
snapshot_read_options.snapshot = txn->GetSnapshot();
txn->GetForUpdate(snapshot_read_options, "foo", &value);
ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
ASSERT_EQ(value, "bar");
s = txn->Put(Slice("foo"), Slice("bar2"));
ASSERT_OK(s);
txn->GetForUpdate(snapshot_read_options, "foo", &value);
ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
ASSERT_EQ(value, "bar2");
// Put a random key so we have a memtable to flush
@ -2304,9 +2307,9 @@ TEST_P(TransactionTest, FlushTest2) {
DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
db->Put(write_options, Slice("foo"), Slice("bar"));
db->Put(write_options, Slice("foo2"), Slice("bar2"));
db->Put(write_options, Slice("foo3"), Slice("bar3"));
ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar2")));
ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar3")));
txn_options.set_snapshot = true;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
@ -2314,13 +2317,13 @@ TEST_P(TransactionTest, FlushTest2) {
snapshot_read_options.snapshot = txn->GetSnapshot();
txn->GetForUpdate(snapshot_read_options, "foo", &value);
ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
ASSERT_EQ(value, "bar");
s = txn->Put(Slice("foo"), Slice("bar2"));
ASSERT_OK(s);
txn->GetForUpdate(snapshot_read_options, "foo", &value);
ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
ASSERT_EQ(value, "bar2");
// verify foo is locked by txn
s = db->Delete(write_options, "foo");
@ -2405,7 +2408,7 @@ TEST_P(TransactionTest, FlushTest2) {
s = db->Delete(write_options, "foo3");
ASSERT_TRUE(s.IsTimedOut());
db_impl->TEST_WaitForCompact();
ASSERT_OK(db_impl->TEST_WaitForCompact());
s = txn->Commit();
ASSERT_OK(s);
@ -2432,16 +2435,16 @@ TEST_P(TransactionTest, NoSnapshotTest) {
std::string value;
Status s;
db->Put(write_options, "AAA", "bar");
ASSERT_OK(db->Put(write_options, "AAA", "bar"));
Transaction* txn = db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
// Modify key after transaction start
db->Put(write_options, "AAA", "bar1");
ASSERT_OK(db->Put(write_options, "AAA", "bar1"));
// Read and write without a snap
txn->GetForUpdate(read_options, "AAA", &value);
ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
ASSERT_EQ(value, "bar1");
s = txn->Put("AAA", "bar2");
ASSERT_OK(s);
@ -2450,7 +2453,7 @@ TEST_P(TransactionTest, NoSnapshotTest) {
s = txn->Commit();
ASSERT_OK(s);
txn->GetForUpdate(read_options, "AAA", &value);
ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
ASSERT_EQ(value, "bar2");
delete txn;
@ -2469,7 +2472,7 @@ TEST_P(TransactionTest, MultipleSnapshotTest) {
Transaction* txn = db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
db->Put(write_options, "AAA", "bar1");
ASSERT_OK(db->Put(write_options, "AAA", "bar1"));
// Read and write without a snapshot
ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
@ -2496,7 +2499,7 @@ TEST_P(TransactionTest, MultipleSnapshotTest) {
snapshot_read_options.snapshot = txn->GetSnapshot();
// Read and write with snapshot
txn->GetForUpdate(snapshot_read_options, "CCC", &value);
ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "CCC", &value));
ASSERT_EQ(value, "bar1");
s = txn->Put("CCC", "bar2");
ASSERT_OK(s);
@ -2539,8 +2542,8 @@ TEST_P(TransactionTest, MultipleSnapshotTest) {
txn = db->BeginTransaction(write_options);
// Potentially conflicting writes
db->Put(write_options, "ZZZ", "zzz");
db->Put(write_options, "XXX", "xxx");
ASSERT_OK(db->Put(write_options, "ZZZ", "zzz"));
ASSERT_OK(db->Put(write_options, "XXX", "xxx"));
txn->SetSnapshot();
@ -2622,12 +2625,12 @@ TEST_P(TransactionTest, ColumnFamiliesTest) {
// Write some data to the db
WriteBatch batch;
batch.Put("foo", "foo");
batch.Put(handles[1], "AAA", "bar");
batch.Put(handles[1], "AAAZZZ", "bar");
ASSERT_OK(batch.Put("foo", "foo"));
ASSERT_OK(batch.Put(handles[1], "AAA", "bar"));
ASSERT_OK(batch.Put(handles[1], "AAAZZZ", "bar"));
s = db->Write(write_options, &batch);
ASSERT_OK(s);
db->Delete(write_options, handles[1], "AAAZZZ");
ASSERT_OK(db->Delete(write_options, handles[1], "AAAZZZ"));
// These keys do not conflict with existing writes since they're in
// different column families
@ -2711,7 +2714,7 @@ TEST_P(TransactionTest, ColumnFamiliesTest) {
ASSERT_TRUE(s.IsNotFound());
// Put a key which will conflict with the next txn using the previous snapshot
db->Put(write_options, handles[2], "foo", "000");
ASSERT_OK(db->Put(write_options, handles[2], "foo", "000"));
results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
multiget_keys, &values);
@ -2775,13 +2778,13 @@ TEST_P(TransactionTest, MultiGetBatchedTest) {
// Write some data to the db
WriteBatch batch;
batch.Put(handles[1], "aaa", "val1");
batch.Put(handles[1], "bbb", "val2");
batch.Put(handles[1], "ccc", "val3");
batch.Put(handles[1], "ddd", "foo");
batch.Put(handles[1], "eee", "val5");
batch.Put(handles[1], "fff", "val6");
batch.Merge(handles[1], "ggg", "foo");
ASSERT_OK(batch.Put(handles[1], "aaa", "val1"));
ASSERT_OK(batch.Put(handles[1], "bbb", "val2"));
ASSERT_OK(batch.Put(handles[1], "ccc", "val3"));
ASSERT_OK(batch.Put(handles[1], "ddd", "foo"));
ASSERT_OK(batch.Put(handles[1], "eee", "val5"));
ASSERT_OK(batch.Put(handles[1], "fff", "val6"));
ASSERT_OK(batch.Merge(handles[1], "ggg", "foo"));
s = db->Write(write_options, &batch);
ASSERT_OK(s);
@ -2870,7 +2873,7 @@ TEST_P(TransactionTest, MultiGetLargeBatchedTest) {
WriteBatch batch;
for (int i = 0; i < 3 * MultiGetContext::MAX_BATCH_SIZE; ++i) {
std::string val = "val" + std::to_string(i);
batch.Put(handles[1], key_str[i], val);
ASSERT_OK(batch.Put(handles[1], key_str[i], val));
}
s = db->Write(write_options, &batch);
ASSERT_OK(s);
@ -3013,7 +3016,7 @@ TEST_P(TransactionTest, EmptyTest) {
delete txn;
txn = db->BeginTransaction(write_options);
txn->Rollback();
ASSERT_OK(txn->Rollback());
delete txn;
txn = db->BeginTransaction(write_options);
@ -3060,17 +3063,23 @@ TEST_P(TransactionTest, PredicateManyPreceders) {
std::vector<Status> results =
txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
ASSERT_EQ(results.size(), 3);
ASSERT_TRUE(results[0].IsNotFound());
ASSERT_TRUE(results[1].IsNotFound());
ASSERT_TRUE(results[2].IsNotFound());
s = txn2->Put("2", "x"); // Conflict's with txn1's MultiGetForUpdate
ASSERT_TRUE(s.IsTimedOut());
txn2->Rollback();
ASSERT_OK(txn2->Rollback());
multiget_values.clear();
results =
txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
ASSERT_EQ(results.size(), 3);
ASSERT_TRUE(results[0].IsNotFound());
ASSERT_TRUE(results[1].IsNotFound());
ASSERT_TRUE(results[2].IsNotFound());
s = txn1->Commit();
ASSERT_OK(s);
@ -3096,7 +3105,7 @@ TEST_P(TransactionTest, PredicateManyPreceders) {
s = txn2->GetForUpdate(read_options2, "4", &value);
ASSERT_TRUE(s.IsBusy());
txn2->Rollback();
ASSERT_OK(txn2->Rollback());
delete txn1;
delete txn2;
@ -3246,7 +3255,7 @@ TEST_P(TransactionTest, UntrackedWrites) {
s = txn->PutUntracked("untracked", "0");
ASSERT_OK(s);
txn->Rollback();
ASSERT_OK(txn->Rollback());
s = db->Get(read_options, "untracked", &value);
ASSERT_TRUE(s.IsNotFound());
@ -3389,7 +3398,7 @@ TEST_P(TransactionTest, ReinitializeTest) {
s = txn1->Put("Z", "a");
ASSERT_OK(s);
txn1->Rollback();
ASSERT_OK(txn1->Rollback());
s = txn1->Put("Y", "y");
ASSERT_OK(s);
@ -3450,7 +3459,7 @@ TEST_P(TransactionTest, Rollback) {
s = txn2->Put("X", "2");
ASSERT_TRUE(s.IsTimedOut());
txn1->Rollback();
ASSERT_OK(txn1->Rollback());
delete txn1;
// txn2 should now be able to write to X
@ -3861,7 +3870,7 @@ TEST_P(TransactionTest, SavepointTest) {
// Rollback to beginning of txn
s = txn->RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
txn->Rollback();
ASSERT_OK(txn->Rollback());
ASSERT_EQ(0, txn->GetNumPuts());
ASSERT_EQ(0, txn->GetNumDeletes());
@ -4218,7 +4227,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest) {
// Verify that A is now unlocked
s = txn2->Put("A", "a2");
ASSERT_OK(s);
txn2->Commit();
ASSERT_OK(txn2->Commit());
delete txn2;
s = db->Get(read_options, "A", &value);
ASSERT_OK(s);
@ -4244,7 +4253,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest) {
s = txn2->Put("B", "b4");
ASSERT_TRUE(s.IsTimedOut());
txn1->Rollback();
ASSERT_OK(txn1->Rollback());
delete txn1;
// Verify that A and B are no longer locked
@ -4432,7 +4441,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest2) {
s = txn2->Put("G", "g3");
ASSERT_OK(s);
txn1->RollbackToSavePoint(); // rollback to 2
ASSERT_OK(txn1->RollbackToSavePoint()); // rollback to 2
// Verify A,B,D,E,F are still locked and C,G,H are not.
s = txn2->Put("A", "a3");
@ -4479,7 +4488,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest2) {
s = txn2->Put("H", "h3");
ASSERT_OK(s);
txn1->RollbackToSavePoint(); // rollback to 1
ASSERT_OK(txn1->RollbackToSavePoint()); // rollback to 1
// Verify A,B,F are still locked and C,D,E,G,H are not.
s = txn2->Put("A", "a3");
@ -5246,6 +5255,7 @@ TEST_P(TransactionStressTest, ExpiredTransactionDataRace1) {
ReadOptions read_options;
string value;
s = db->Get(read_options, "X", &value);
ASSERT_OK(s);
ASSERT_EQ("1", value);
delete txn1;
@ -5281,6 +5291,7 @@ Status TransactionStressTestInserter(
return inserter.GetLastStatus();
}
}
inserter.GetLastStatus().PermitUncheckedError();
// Make sure at least some of the transactions succeeded. It's ok if
// some failed due to write-conflicts.
@ -5300,20 +5311,20 @@ Status TransactionStressTestInserter(
TEST_P(MySQLStyleTransactionTest, TransactionStressTest) {
// Small write buffer to trigger more compactions
options.write_buffer_size = 1024;
ReOpenNoDelete();
const size_t num_workers = 4; // worker threads count
const size_t num_checkers = 2; // checker threads count
const size_t num_slow_checkers = 2; // checker threads emulating backups
const size_t num_slow_workers = 1; // slow worker threads count
const size_t num_transactions_per_thread = 10000;
const uint16_t num_sets = 3;
const size_t num_keys_per_set = 100;
ASSERT_OK(ReOpenNoDelete());
constexpr size_t num_workers = 4; // worker threads count
constexpr size_t num_checkers = 2; // checker threads count
constexpr size_t num_slow_checkers = 2; // checker threads emulating backups
constexpr size_t num_slow_workers = 1; // slow worker threads count
constexpr size_t num_transactions_per_thread = 10000;
constexpr uint16_t num_sets = 3;
constexpr size_t num_keys_per_set = 100;
// Setting the key-space to be 100 keys should cause enough write-conflicts
// to make this test interesting.
std::vector<port::Thread> threads;
std::atomic<uint32_t> finished = {0};
bool TAKE_SNAPSHOT = true;
constexpr bool TAKE_SNAPSHOT = true;
uint64_t time_seed = env->NowMicros();
printf("time_seed is %" PRIu64 "\n", time_seed); // would help to reproduce
@ -5329,9 +5340,8 @@ TEST_P(MySQLStyleTransactionTest, TransactionStressTest) {
Random64 rand(time_seed * thd_seed);
// Verify that data is consistent
while (finished < num_workers) {
Status s = RandomTransactionInserter::Verify(
db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand);
ASSERT_OK(s);
ASSERT_OK(RandomTransactionInserter::Verify(
db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand));
}
};
std::function<void()> call_slow_checker = [&] {
@ -5412,7 +5422,7 @@ TEST_P(TransactionTest, MemoryLimitTest) {
ASSERT_TRUE(s.IsMemoryLimit());
ASSERT_EQ(2, txn->GetNumPuts());
txn->Rollback();
ASSERT_OK(txn->Rollback());
delete txn;
}
@ -5551,7 +5561,7 @@ TEST_P(TransactionTest, Optimizations) {
ASSERT_OK(ReOpen());
WriteOptions write_options;
WriteBatch batch;
batch.Put(Slice("k"), Slice("v1"));
ASSERT_OK(batch.Put(Slice("k"), Slice("v1")));
ASSERT_OK(db->Write(write_options, &batch));
ReadOptions ropt;
@ -5593,7 +5603,7 @@ class ThreeBytewiseComparator : public Comparator {
TEST_P(TransactionTest, GetWithoutSnapshot) {
WriteOptions write_options;
std::atomic<bool> finish = {false};
db->Put(write_options, "key", "value");
ASSERT_OK(db->Put(write_options, "key", "value"));
ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
for (int i = 0; i < 100; i++) {
TransactionOptions txn_options;
@ -5629,16 +5639,16 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
WriteOptions write_options;
WriteBatch batch;
batch.Put(Slice("key"), Slice("value"));
batch.Put(Slice("key2"), Slice("value2"));
ASSERT_OK(batch.Put(Slice("key"), Slice("value")));
ASSERT_OK(batch.Put(Slice("key2"), Slice("value2")));
// duplicate the keys
batch.Put(Slice("key"), Slice("value3"));
ASSERT_OK(batch.Put(Slice("key"), Slice("value3")));
// duplicate the 2nd key. It should not be counted duplicate since a
// sub-patch is cut after the last duplicate.
batch.Put(Slice("key2"), Slice("value4"));
ASSERT_OK(batch.Put(Slice("key2"), Slice("value4")));
// duplicate the keys but in a different cf. It should not be counted as
// duplicate keys
batch.Put(cf_handle, Slice("key"), Slice("value5"));
ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value5")));
ASSERT_OK(db->Write(write_options, &batch));
@ -5665,11 +5675,11 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
WriteOptions write_options;
WriteBatch batch;
batch.Put(cf_handle, Slice("key"), Slice("value"));
ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value")));
// The first three bytes are the same, do it must be counted as duplicate
batch.Put(cf_handle, Slice("key2"), Slice("value2"));
ASSERT_OK(batch.Put(cf_handle, Slice("key2"), Slice("value2")));
// check for 2nd duplicate key in cf with non-default comparator
batch.Put(cf_handle, Slice("key2b"), Slice("value2b"));
ASSERT_OK(batch.Put(cf_handle, Slice("key2b"), Slice("value2b")));
ASSERT_OK(db->Write(write_options, &batch));
// The value must be the most recent value for all the keys equal to "key",
@ -5834,10 +5844,10 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->Put(write_options, cf_handle, Slice("key"), Slice("value")));
WriteBatch batch;
// Merge more than max_successive_merges times
batch.Merge(cf_handle, Slice("key"), Slice("1"));
batch.Merge(cf_handle, Slice("key"), Slice("2"));
batch.Merge(cf_handle, Slice("key"), Slice("3"));
batch.Merge(cf_handle, Slice("key"), Slice("4"));
ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("1")));
ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("2")));
ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("3")));
ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("4")));
ASSERT_OK(db->Write(write_options, &batch));
ReadOptions read_options;
string value;
@ -5916,10 +5926,10 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(txn0->Prepare());
delete txn0;
// This will check the asserts inside recovery code
db->FlushWAL(true);
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
static_cast_with_check<DBImpl>(db->GetRootDB())
->TEST_FlushMemTable(true, false, handles[1]);
ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
->TEST_FlushMemTable(true, false, handles[1]));
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
@ -5956,8 +5966,8 @@ TEST_P(TransactionTest, DuplicateKeys) {
// This will check the asserts inside recovery code
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
static_cast_with_check<DBImpl>(db->GetRootDB())
->TEST_FlushMemTable(true, false, handles[1]);
ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
->TEST_FlushMemTable(true, false, handles[1]));
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
@ -5989,8 +5999,8 @@ TEST_P(TransactionTest, DuplicateKeys) {
// This will check the asserts inside recovery code
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
static_cast_with_check<DBImpl>(db->GetRootDB())
->TEST_FlushMemTable(true, false, handles[1]);
ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
->TEST_FlushMemTable(true, false, handles[1]));
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
@ -6016,8 +6026,8 @@ TEST_P(TransactionTest, DuplicateKeys) {
// This will check the asserts inside recovery code
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
static_cast_with_check<DBImpl>(db->GetRootDB())
->TEST_FlushMemTable(true, false, handles[1]);
ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
->TEST_FlushMemTable(true, false, handles[1]));
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
@ -6043,8 +6053,8 @@ TEST_P(TransactionTest, DuplicateKeys) {
// This will check the asserts inside recovery code
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
static_cast_with_check<DBImpl>(db->GetRootDB())
->TEST_FlushMemTable(true, false, handles[1]);
ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
->TEST_FlushMemTable(true, false, handles[1]));
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
@ -6075,7 +6085,7 @@ TEST_P(TransactionTest, ReseekOptimization) {
write_options.sync = true;
write_options.disableWAL = false;
ColumnFamilyDescriptor cfd;
db->DefaultColumnFamily()->GetDescriptor(&cfd);
ASSERT_OK(db->DefaultColumnFamily()->GetDescriptor(&cfd));
auto max_skip = cfd.options.max_sequential_skip_in_iterations;
ASSERT_OK(db->Put(write_options, Slice("foo0"), Slice("initv")));
@ -6113,7 +6123,7 @@ TEST_P(TransactionTest, ReseekOptimization) {
}
ASSERT_EQ(cnt, 2);
delete iter;
txn0->Rollback();
ASSERT_OK(txn0->Rollback());
delete txn0;
}
@ -6125,7 +6135,7 @@ TEST_P(TransactionTest, DoubleCrashInRecovery) {
for (const bool write_after_recovery : {false, true}) {
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
options.manual_wal_flush = manual_wal_flush;
ReOpen();
ASSERT_OK(ReOpen());
std::string cf_name = "two";
ColumnFamilyOptions cf_options;
ColumnFamilyHandle* cf_handle = nullptr;
@ -6140,12 +6150,12 @@ TEST_P(TransactionTest, DoubleCrashInRecovery) {
ASSERT_OK(txn->Prepare());
FlushOptions flush_ops;
db->Flush(flush_ops);
ASSERT_OK(db->Flush(flush_ops));
// Now we have a log that cannot be deleted
ASSERT_OK(db->Put(write_options, cf_handle, "foo1", "bar1"));
// Flush only the 2nd cf
db->Flush(flush_ops, cf_handle);
ASSERT_OK(db->Flush(flush_ops, cf_handle));
// The value is large enough to be touched by the corruption we ingest
// below.
@ -6157,7 +6167,7 @@ TEST_P(TransactionTest, DoubleCrashInRecovery) {
// key/value not touched by corruption
ASSERT_OK(db->Put(write_options, "foo4", "bar4"));
db->FlushWAL(true);
ASSERT_OK(db->FlushWAL(true));
DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
uint64_t wal_file_id = db_impl->TEST_LogfileNumber();
std::string fname = LogFileName(dbname, wal_file_id);
@ -6191,7 +6201,7 @@ TEST_P(TransactionTest, DoubleCrashInRecovery) {
}
// Persist data written to WAL during recovery or by the last Put
db->FlushWAL(true);
ASSERT_OK(db->FlushWAL(true));
// 2nd crash to recover while having a valid log after the corrupted one.
ASSERT_OK(ReOpenNoDelete(column_families, &handles));
assert(db != nullptr);

@ -104,8 +104,9 @@ Status WritePreparedTxn::PrepareInternal() {
write_options.disableWAL = false;
const bool WRITE_AFTER_COMMIT = true;
const bool kFirstPrepareBatch = true;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
!WRITE_AFTER_COMMIT);
auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
name_, !WRITE_AFTER_COMMIT);
assert(s.ok());
// For each duplicate key we account for a new sub-batch
prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
// Having AddPrepared in the PreReleaseCallback allows in-order addition of
@ -116,10 +117,10 @@ Status WritePreparedTxn::PrepareInternal() {
db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch);
const bool DISABLE_MEMTABLE = true;
uint64_t seq_used = kMaxSequenceNumber;
Status s = db_impl_->WriteImpl(
write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE,
&seq_used, prepare_batch_cnt_, &add_prepared_callback);
s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0,
!DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_,
&add_prepared_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
auto prepare_seq = seq_used;
SetId(prepare_seq);
@ -144,7 +145,8 @@ Status WritePreparedTxn::CommitInternal() {
// The Memtable will ignore the Commit marker in non-recovery mode
WriteBatch* working_batch = GetCommitTimeWriteBatch();
const bool empty = working_batch->Count() == 0;
WriteBatchInternal::MarkCommit(working_batch, name_);
auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
assert(s.ok());
const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
if (!empty && for_recovery) {
@ -162,7 +164,7 @@ Status WritePreparedTxn::CommitInternal() {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Duplicate key overhead");
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = working_batch->Iterate(&counter);
s = working_batch->Iterate(&counter);
assert(s.ok());
commit_batch_cnt = counter.BatchCount();
}
@ -188,9 +190,9 @@ Status WritePreparedTxn::CommitInternal() {
// redundantly reference the log that contains the prepared data.
const uint64_t zero_log_number = 0ull;
size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
zero_log_number, disable_memtable, &seq_used,
batch_cnt, pre_release_callback);
s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
zero_log_number, disable_memtable, &seq_used,
batch_cnt, pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
const SequenceNumber commit_batch_seq = seq_used;
if (LIKELY(do_one_write || !s.ok())) {
@ -217,9 +219,11 @@ Status WritePreparedTxn::CommitInternal() {
wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData,
commit_batch_seq, commit_batch_cnt);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
s = empty_batch.PutLogData(Slice());
assert(s.ok());
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch);
s = WriteBatchInternal::InsertNoop(&empty_batch);
assert(s.ok());
const bool DISABLE_MEMTABLE = true;
const size_t ONE_BATCH = 1;
const uint64_t NO_REF_LOG = 0;
@ -347,12 +351,12 @@ Status WritePreparedTxn::RollbackInternal() {
wpt_db_->txn_db_options_.rollback_merge_operands,
roptions);
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
assert(s.ok());
if (!s.ok()) {
return s;
}
// The Rollback marker will be used as a batch separator
WriteBatchInternal::MarkRollback(&rollback_batch, name_);
s = WriteBatchInternal::MarkRollback(&rollback_batch, name_);
assert(s.ok());
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
const bool DISABLE_MEMTABLE = true;
const uint64_t NO_REF_LOG = 0;
@ -402,9 +406,11 @@ Status WritePreparedTxn::RollbackInternal() {
WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
s = empty_batch.PutLogData(Slice());
assert(s.ok());
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch);
s = WriteBatchInternal::InsertNoop(&empty_batch);
assert(s.ok());
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare);

@ -168,7 +168,8 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
WriteOptions write_options(write_options_orig);
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(batch);
auto s = WriteBatchInternal::InsertNoop(batch);
assert(s.ok());
const bool DISABLE_MEMTABLE = true;
const uint64_t no_log_ref = 0;
uint64_t seq_used = kMaxSequenceNumber;
@ -189,9 +190,9 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
} else {
pre_release_callback = &add_prepared_callback;
}
auto s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr,
no_log_ref, !DISABLE_MEMTABLE, &seq_used,
batch_cnt, pre_release_callback);
s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr, no_log_ref,
!DISABLE_MEMTABLE, &seq_used, batch_cnt,
pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
uint64_t prepare_seq = seq_used;
if (txn != nullptr) {

@ -279,7 +279,9 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) {
static std::atomic_ullong autogen_id{0};
// To avoid changing all tests to call SetName, just autogenerate one.
if (wupt_db_->txn_db_options_.autogenerate_name) {
SetName(std::string("autoxid") + ToString(autogen_id.fetch_add(1)));
auto s =
SetName(std::string("autoxid") + ToString(autogen_id.fetch_add(1)));
assert(s.ok());
} else
#endif
{
@ -354,8 +356,9 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) {
const bool WRITE_AFTER_COMMIT = true;
const bool first_prepare_batch = log_number_ == 0;
// MarkEndPrepare will change Noop marker to the appropriate marker.
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
!WRITE_AFTER_COMMIT, !prepared);
s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
name_, !WRITE_AFTER_COMMIT, !prepared);
assert(s.ok());
// For each duplicate key we account for a new sub-batch
prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
// AddPrepared better to be called in the pre-release callback otherwise there
@ -541,7 +544,8 @@ Status WriteUnpreparedTxn::CommitInternal() {
// will ignore the Commit marker in non-recovery mode
WriteBatch* working_batch = GetCommitTimeWriteBatch();
const bool empty = working_batch->Count() == 0;
WriteBatchInternal::MarkCommit(working_batch, name_);
auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
assert(s.ok());
const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
if (!empty && for_recovery) {
@ -557,7 +561,7 @@ Status WriteUnpreparedTxn::CommitInternal() {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Duplicate key overhead");
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = working_batch->Iterate(&counter);
s = working_batch->Iterate(&counter);
assert(s.ok());
commit_batch_cnt = counter.BatchCount();
}
@ -583,9 +587,9 @@ Status WriteUnpreparedTxn::CommitInternal() {
// need to redundantly reference the log that contains the prepared data.
const uint64_t zero_log_number = 0ull;
size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
zero_log_number, disable_memtable, &seq_used,
batch_cnt, pre_release_callback);
s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
zero_log_number, disable_memtable, &seq_used,
batch_cnt, pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
const SequenceNumber commit_batch_seq = seq_used;
if (LIKELY(do_one_write || !s.ok())) {
@ -619,9 +623,11 @@ Status WriteUnpreparedTxn::CommitInternal() {
// Update commit map only from the 2nd queue
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
s = empty_batch.PutLogData(Slice());
assert(s.ok());
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch);
s = WriteBatchInternal::InsertNoop(&empty_batch);
assert(s.ok());
const bool DISABLE_MEMTABLE = true;
const size_t ONE_BATCH = 1;
const uint64_t NO_REF_LOG = 0;
@ -719,10 +725,14 @@ Status WriteUnpreparedTxn::RollbackInternal() {
// TODO(lth): We write rollback batch all in a single batch here, but this
// should be subdivded into multiple batches as well. In phase 2, when key
// sets are read from WAL, this will happen naturally.
WriteRollbackKeys(*tracked_locks_, &rollback_batch, &callback, roptions);
s = WriteRollbackKeys(*tracked_locks_, &rollback_batch, &callback, roptions);
if (!s.ok()) {
return s;
}
// The Rollback marker will be used as a batch separator
WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_);
s = WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_);
assert(s.ok());
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
const bool DISABLE_MEMTABLE = true;
const uint64_t NO_REF_LOG = 0;
@ -778,9 +788,11 @@ Status WriteUnpreparedTxn::RollbackInternal() {
prepare_seq);
WriteBatch empty_batch;
const size_t ONE_BATCH = 1;
empty_batch.PutLogData(Slice());
s = empty_batch.PutLogData(Slice());
assert(s.ok());
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch);
s = WriteBatchInternal::InsertNoop(&empty_batch);
assert(s.ok());
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_rollback_batch);
@ -863,11 +875,13 @@ Status WriteUnpreparedTxn::RollbackToSavePointInternal() {
WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
top.unprep_seqs_,
kBackedByDBSnapshot);
WriteRollbackKeys(tracked_keys, &write_batch_, &callback, roptions);
s = WriteRollbackKeys(tracked_keys, &write_batch_, &callback, roptions);
if (!s.ok()) {
return s;
}
const bool kPrepared = true;
s = FlushWriteBatchToDBInternal(!kPrepared);
assert(s.ok());
if (!s.ok()) {
return s;
}

Loading…
Cancel
Save