WriteUnPrepared: increase test coverage in transaction_test (#5658)

Summary:
The changes transaction_test to set `txn_db_options.default_write_batch_flush_threshold = 1` in order to give better test coverage for WriteUnprepared.

As part of the change, some tests had to be updated.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5658

Differential Revision: D16740468

Pulled By: lth

fbshipit-source-id: 3821eec20baf13917c8c1fab444332f75a509de9
main
Manuel Ung 6 years ago committed by Facebook Github Bot
parent de3fb9a6ff
commit 6f0f82de87
  1. 4
      include/rocksdb/utilities/transaction.h
  2. 6
      include/rocksdb/utilities/transaction_db.h
  3. 31
      utilities/transactions/transaction_test.cc
  4. 36
      utilities/transactions/transaction_test.h
  5. 2
      utilities/transactions/write_prepared_transaction_test.cc
  6. 12
      utilities/transactions/write_unprepared_transaction_test.cc
  7. 36
      utilities/transactions/write_unprepared_txn.cc
  8. 15
      utilities/transactions/write_unprepared_txn.h

@ -522,9 +522,13 @@ class Transaction {
id_ = id; id_ = id;
} }
virtual uint64_t GetLastLogNumber() const { return log_number_; }
private: private:
friend class PessimisticTransactionDB; friend class PessimisticTransactionDB;
friend class WriteUnpreparedTxnDB; friend class WriteUnpreparedTxnDB;
friend class TransactionTest_TwoPhaseLogRollingTest_Test;
friend class TransactionTest_TwoPhaseLogRollingTest2_Test;
// No copying allowed // No copying allowed
Transaction(const Transaction&); Transaction(const Transaction&);
void operator=(const Transaction&); void operator=(const Transaction&);

@ -112,8 +112,14 @@ struct TransactionDBOptions {
// 8m entry, 64MB size // 8m entry, 64MB size
size_t wp_commit_cache_bits = static_cast<size_t>(23); size_t wp_commit_cache_bits = static_cast<size_t>(23);
// For testing, whether transaction name should be auto-generated or not. This
// is useful for write unprepared which requires named transactions.
bool autogenerate_name = false;
friend class WritePreparedTxnDB; friend class WritePreparedTxnDB;
friend class WriteUnpreparedTxn;
friend class WritePreparedTransactionTestBase; friend class WritePreparedTransactionTestBase;
friend class TransactionTestBase;
friend class MySQLStyleTransactionTest; friend class MySQLStyleTransactionTest;
}; };

@ -1727,7 +1727,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
// our log should be in the heap // our log should be in the heap
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
txn1->GetLogNumber()); txn1->GetLogNumber());
ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn1->GetLastLogNumber());
// flush default cf to crate new log // flush default cf to crate new log
s = db->Put(wopts, "foo", "bar"); s = db->Put(wopts, "foo", "bar");
@ -1736,12 +1736,12 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
ASSERT_OK(s); ASSERT_OK(s);
// make sure we are on a new log // make sure we are on a new log
ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLastLogNumber());
// put txn2 prep section in this log // put txn2 prep section in this log
s = txn2->Prepare(); s = txn2->Prepare();
ASSERT_OK(s); ASSERT_OK(s);
ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber()); ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn2->GetLastLogNumber());
// heap should still see first log // heap should still see first log
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
@ -1777,7 +1777,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
ASSERT_OK(s); ASSERT_OK(s);
// make sure we are on a new log // make sure we are on a new log
ASSERT_GT(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber()); ASSERT_GT(db_impl->TEST_LogfileNumber(), txn2->GetLastLogNumber());
// commit txn2 // commit txn2
s = txn2->Commit(); s = txn2->Commit();
@ -1878,7 +1878,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
s = db->Put(wopts, "cats", "dogs1"); s = db->Put(wopts, "cats", "dogs1");
ASSERT_OK(s); ASSERT_OK(s);
auto prepare_log_no = txn1->GetLogNumber(); auto prepare_log_no = txn1->GetLastLogNumber();
// roll to LOG B // roll to LOG B
s = db_impl->TEST_FlushMemTable(true); s = db_impl->TEST_FlushMemTable(true);
@ -1905,7 +1905,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
assert(false); assert(false);
} }
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
prepare_log_no); txn1->GetLogNumber());
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0); ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
// commit in LOG B // commit in LOG B
@ -2604,10 +2604,8 @@ TEST_P(TransactionTest, ColumnFamiliesTest) {
std::vector<ColumnFamilyHandle*> handles; std::vector<ColumnFamilyHandle*> handles;
s = TransactionDB::Open(options, txn_db_options, dbname, column_families, ASSERT_OK(ReOpenNoDelete(column_families, &handles));
&handles, &db);
assert(db != nullptr); assert(db != nullptr);
ASSERT_OK(s);
Transaction* txn = db->BeginTransaction(write_options); Transaction* txn = db->BeginTransaction(write_options);
ASSERT_TRUE(txn); ASSERT_TRUE(txn);
@ -2769,10 +2767,8 @@ TEST_P(TransactionTest, MultiGetBatchedTest) {
std::vector<ColumnFamilyHandle*> handles; std::vector<ColumnFamilyHandle*> handles;
options.merge_operator = MergeOperators::CreateStringAppendOperator(); options.merge_operator = MergeOperators::CreateStringAppendOperator();
s = TransactionDB::Open(options, txn_db_options, dbname, column_families, ASSERT_OK(ReOpenNoDelete(column_families, &handles));
&handles, &db);
assert(db != nullptr); assert(db != nullptr);
ASSERT_OK(s);
// Write some data to the db // Write some data to the db
WriteBatch batch; WriteBatch batch;
@ -3132,6 +3128,12 @@ TEST_P(TransactionTest, LostUpdate) {
} }
TEST_P(TransactionTest, UntrackedWrites) { TEST_P(TransactionTest, UntrackedWrites) {
if (txn_db_options.write_policy == WRITE_UNPREPARED) {
// TODO(lth): For WriteUnprepared, validate that untracked writes are
// not supported.
return;
}
WriteOptions write_options; WriteOptions write_options;
ReadOptions read_options; ReadOptions read_options;
std::string value; std::string value;
@ -3376,7 +3378,7 @@ TEST_P(TransactionTest, LockLimitTest) {
// Open DB with a lock limit of 3 // Open DB with a lock limit of 3
txn_db_options.max_num_locks = 3; txn_db_options.max_num_locks = 3;
s = TransactionDB::Open(options, txn_db_options, dbname, &db); ASSERT_OK(ReOpen());
assert(db != nullptr); assert(db != nullptr);
ASSERT_OK(s); ASSERT_OK(s);
@ -5285,6 +5287,9 @@ TEST_P(TransactionTest, MemoryLimitTest) {
TransactionOptions txn_options; TransactionOptions txn_options;
// Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data. // Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data.
txn_options.max_write_batch_size = 29; txn_options.max_write_batch_size = 29;
// Set threshold to unlimited so that the write batch does not get flushed,
// and can hit the memory limit.
txn_options.write_batch_flush_threshold = 0;
std::string value; std::string value;
Status s; Status s;

@ -27,6 +27,7 @@
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend.h" #include "utilities/merge_operators/string_append/stringappend.h"
#include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/write_unprepared_txn_db.h"
#include "port/port.h" #include "port/port.h"
@ -67,6 +68,12 @@ class TransactionTestBase : public ::testing::Test {
txn_db_options.default_lock_timeout = 0; txn_db_options.default_lock_timeout = 0;
txn_db_options.write_policy = write_policy; txn_db_options.write_policy = write_policy;
txn_db_options.rollback_merge_operands = true; txn_db_options.rollback_merge_operands = true;
// This will stress write unprepared, by forcing write batch flush on every
// write.
txn_db_options.default_write_batch_flush_threshold = 1;
// Write unprepared requires all transactions to be named. This setting
// autogenerates the name so that existing tests can pass.
txn_db_options.autogenerate_name = true;
Status s; Status s;
if (use_stackable_db == false) { if (use_stackable_db == false) {
s = TransactionDB::Open(options, txn_db_options, dbname, &db); s = TransactionDB::Open(options, txn_db_options, dbname, &db);
@ -273,13 +280,20 @@ class TransactionTestBase : public ::testing::Test {
if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
// Consume one seq per key // Consume one seq per key
exp_seq += 4; exp_seq += 4;
} else { } else if (txn_db_options.write_policy ==
TxnDBWritePolicy::WRITE_PREPARED) {
// Consume one seq per batch // Consume one seq per batch
exp_seq++; exp_seq++;
if (options.two_write_queues) { if (options.two_write_queues) {
// Consume one seq for commit // Consume one seq for commit
exp_seq++; exp_seq++;
} }
} else {
// Flushed after each key, consume one seq per flushed batch
exp_seq += 4;
// WriteUnprepared implements CommitWithoutPrepareInternal by simply
// calling Prepare then Commit. Consume one seq for the prepare.
exp_seq++;
} }
delete txn; delete txn;
with_empty_commits++; with_empty_commits++;
@ -303,11 +317,17 @@ class TransactionTestBase : public ::testing::Test {
if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
// Consume one seq per key // Consume one seq per key
exp_seq += 5; exp_seq += 5;
} else { } else if (txn_db_options.write_policy ==
TxnDBWritePolicy::WRITE_PREPARED) {
// Consume one seq per batch // Consume one seq per batch
exp_seq++; exp_seq++;
// Consume one seq per commit marker // Consume one seq per commit marker
exp_seq++; exp_seq++;
} else {
// Flushed after each key, consume one seq per flushed batch
exp_seq += 5;
// Consume one seq per commit marker
exp_seq++;
} }
delete txn; delete txn;
}; };
@ -330,7 +350,8 @@ class TransactionTestBase : public ::testing::Test {
if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
// No seq is consumed for deleting the txn buffer // No seq is consumed for deleting the txn buffer
exp_seq += 0; exp_seq += 0;
} else { } else if (txn_db_options.write_policy ==
TxnDBWritePolicy::WRITE_PREPARED) {
// Consume one seq per batch // Consume one seq per batch
exp_seq++; exp_seq++;
// Consume one seq per rollback batch // Consume one seq per rollback batch
@ -339,6 +360,15 @@ class TransactionTestBase : public ::testing::Test {
// Consume one seq for rollback commit // Consume one seq for rollback commit
exp_seq++; exp_seq++;
} }
} else {
// Flushed after each key, consume one seq per flushed batch
exp_seq += 5;
// Consume one seq per rollback batch
exp_seq++;
if (options.two_write_queues) {
// Consume one seq for rollback commit
exp_seq++;
}
} }
delete txn; delete txn;
}; };

@ -1612,7 +1612,7 @@ TEST_P(WritePreparedTransactionTest, SmallestUnCommittedSeq) {
txn = txns[index]; txn = txns[index];
txns.erase(txns.begin() + index); txns.erase(txns.begin() + index);
} }
// Since commit cahce is practically disabled, commit results in immediate // Since commit cache is practically disabled, commit results in immediate
// advance in max_evicted_seq_ and subsequently moving some prepared txns // advance in max_evicted_seq_ and subsequently moving some prepared txns
// to delayed_prepared_. // to delayed_prepared_.
txn->Commit(); txn->Commit();

@ -335,7 +335,11 @@ TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
for (int i = 0; i < num_batches; i++) { for (int i = 0; i < num_batches; i++) {
ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i))); ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i)));
if (txn_options.write_batch_flush_threshold == 1) { if (txn_options.write_batch_flush_threshold == 1) {
ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1); // WriteUnprepared will check write_batch_flush_threshold and
// possibly flush before appending to the write batch. No flush
// will happen at the first write because the batch is still
// empty, so after k puts, there should be k-1 flushed batches.
ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i);
} else { } else {
ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
} }
@ -411,7 +415,11 @@ TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
for (int i = 0; i < kNumKeys; i++) { for (int i = 0; i < kNumKeys; i++) {
txn->Put("k" + ToString(i), "v" + ToString(i)); txn->Put("k" + ToString(i), "v" + ToString(i));
if (txn_options.write_batch_flush_threshold == 1) { if (txn_options.write_batch_flush_threshold == 1) {
ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1); // WriteUnprepared will check write_batch_flush_threshold and
// possibly flush before appending to the write batch. No flush will
// happen at the first write because the batch is still empty, so
// after k puts, there should be k-1 flushed batches.
ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i);
} else { } else {
ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
} }

@ -37,6 +37,7 @@ WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
const TransactionOptions& txn_options) const TransactionOptions& txn_options)
: WritePreparedTxn(txn_db, write_options, txn_options), : WritePreparedTxn(txn_db, write_options, txn_options),
wupt_db_(txn_db), wupt_db_(txn_db),
last_log_number_(0),
recovered_txn_(false), recovered_txn_(false),
largest_validated_seq_(0) { largest_validated_seq_(0) {
if (txn_options.write_batch_flush_threshold < 0) { if (txn_options.write_batch_flush_threshold < 0) {
@ -56,10 +57,15 @@ WriteUnpreparedTxn::~WriteUnpreparedTxn() {
// We should rollback regardless of GetState, but some unit tests that // We should rollback regardless of GetState, but some unit tests that
// test crash recovery run the destructor assuming that rollback does not // test crash recovery run the destructor assuming that rollback does not
// happen, so that rollback during recovery can be exercised. // happen, so that rollback during recovery can be exercised.
if (GetState() == STARTED) { if (GetState() == STARTED || GetState() == LOCKS_STOLEN) {
auto s __attribute__((__unused__)) = RollbackInternal(); auto s = RollbackInternal();
// TODO(lth): Better error handling.
assert(s.ok()); assert(s.ok());
if (!s.ok()) {
ROCKS_LOG_FATAL(
wupt_db_->info_log_,
"Rollback of WriteUnprepared transaction failed in destructor: %s",
s.ToString().c_str());
}
dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
log_number_); log_number_);
} }
@ -233,6 +239,7 @@ Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
const bool kPrepared = true; const bool kPrepared = true;
Status s; Status s;
if (write_batch_flush_threshold_ > 0 && if (write_batch_flush_threshold_ > 0 &&
write_batch_.GetWriteBatch()->Count() > 0 &&
write_batch_.GetDataSize() > write_batch_.GetDataSize() >
static_cast<size_t>(write_batch_flush_threshold_)) { static_cast<size_t>(write_batch_flush_threshold_)) {
assert(GetState() != PREPARED); assert(GetState() != PREPARED);
@ -257,8 +264,18 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) { Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) {
if (name_.empty()) { if (name_.empty()) {
assert(!prepared);
#ifndef NDEBUG
static std::atomic_ullong autogen_id{0};
// To avoid changing all tests to call SetName, just autogenerate one.
if (wupt_db_->txn_db_options_.autogenerate_name) {
SetName(std::string("autoxid") + ToString(autogen_id.fetch_add(1)));
} else
#endif
{
return Status::InvalidArgument("Cannot write to DB without SetName."); return Status::InvalidArgument("Cannot write to DB without SetName.");
} }
}
// TODO(lth): Reduce duplicate code with WritePrepared prepare logic. // TODO(lth): Reduce duplicate code with WritePrepared prepare logic.
WriteOptions write_options = write_options_; WriteOptions write_options = write_options_;
@ -285,11 +302,14 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) {
// from the current transaction. This means that if log_number_ is set, // from the current transaction. This means that if log_number_ is set,
// WriteImpl should not overwrite that value, so set log_used to nullptr if // WriteImpl should not overwrite that value, so set log_used to nullptr if
// log_number_ is already set. // log_number_ is already set.
uint64_t* log_used = log_number_ ? nullptr : &log_number_; auto s =
auto s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, log_used, /*log ref*/ /*callback*/ nullptr, &last_log_number_, /*log ref*/
0, !DISABLE_MEMTABLE, &seq_used, 0, !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_,
prepare_batch_cnt_, &add_prepared_callback); &add_prepared_callback);
if (log_number_ == 0) {
log_number_ = last_log_number_;
}
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
auto prepare_seq = seq_used; auto prepare_seq = seq_used;

@ -145,8 +145,21 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
const SliceParts& key, const SliceParts& key,
const bool assume_tracked = false) override; const bool assume_tracked = false) override;
// In WriteUnprepared, untracked writes will break snapshot validation logic.
// Snapshot validation will only check the largest sequence number of a key to
// see if it was committed or not. However, an untracked unprepared write will
// hide smaller committed sequence numbers.
//
// TODO(lth): Investigate whether it is worth having snapshot validation
// validate all values larger than snap_seq. Otherwise, we should return
// Status::NotSupported for untracked writes.
virtual Status RebuildFromWriteBatch(WriteBatch*) override; virtual Status RebuildFromWriteBatch(WriteBatch*) override;
virtual uint64_t GetLastLogNumber() const override {
return last_log_number_;
}
protected: protected:
void Initialize(const TransactionOptions& txn_options) override; void Initialize(const TransactionOptions& txn_options) override;
@ -219,6 +232,8 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
// commit callbacks. // commit callbacks.
std::map<SequenceNumber, size_t> unprep_seqs_; std::map<SequenceNumber, size_t> unprep_seqs_;
uint64_t last_log_number_;
// Recovered transactions have tracked_keys_ populated, but are not actually // Recovered transactions have tracked_keys_ populated, but are not actually
// locked for efficiency reasons. For recovered transactions, skip unlocking // locked for efficiency reasons. For recovered transactions, skip unlocking
// keys when transaction ends. // keys when transaction ends.

Loading…
Cancel
Save